From 46e6061d63562ce021ef8f3b5062d3eba1c2db4e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 17 Nov 2006 13:46:39 +0000
Subject: [PATCH] The synchronization changelog monitoring information has a counter named waiting-changes that publish the number of updates known by the changelog server that have not yest been sent to each ldap server because they are too slow to replay them.
---
opends/resource/schema/02-config.ldif | 7 +
opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java | 60 +++++----
opends/resource/config/synchronization.ldif | 2
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java | 13 -
opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java | 18 +--
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java | 57 ---------
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 20 +++
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java | 17 ++
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java | 46 ++++++-
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java | 4
opends/src/server/org/opends/server/synchronization/common/ServerState.java | 11 +
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 65 ++++++++--
opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java | 7
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java | 2
15 files changed, 194 insertions(+), 137 deletions(-)
diff --git a/opends/resource/config/synchronization.ldif b/opends/resource/config/synchronization.ldif
index 6336c65..8924b3c 100644
--- a/opends/resource/config/synchronization.ldif
+++ b/opends/resource/config/synchronization.ldif
@@ -14,7 +14,7 @@
objectClass: top
objectClass: ds-cfg-synchronization-provider
ds-cfg-synchronization-provider-enabled: true
-ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization
+ds-cfg-synchronization-provider-class: org.opends.server.synchronization.plugin.MultimasterSynchronization
dn: cn=example, cn=Multimaster Synchronization,cn=Synchronization Providers,cn=config
objectClass: top
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 70f64a4..c05900b 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -525,6 +525,10 @@
NAME 'ds-cfg-changelog-port'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.290
+ NAME 'ds-cfg-changelog-max-queue-size'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+ SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.278
NAME 'ds-cfg-changelog-server-id'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -1303,7 +1307,8 @@
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
'ds-cfg-synchronization-changelog-server-config' SUP top
STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
- MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
+ MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
+ ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index b90ea1d..2a74fb9 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -98,11 +98,13 @@
new ArrayList<ConfigAttribute>();
private ChangelogDbEnv dbEnv;
private int rcvWindow;
+ private int queueSize;
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
+ static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
static final IntegerConfigAttribute changelogPortStub =
new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -122,6 +124,10 @@
new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
false, false, false, true, 0, false, 0);
+ static final IntegerConfigAttribute queueSizeStub =
+ new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
+ false, false, false, true, 0, false, 0);
+
/**
* Check if a ConfigEntry is valid.
* @param config The config entry that needs to be checked.
@@ -247,6 +253,16 @@
configAttributes.add(windowAttr);
}
+ IntegerConfigAttribute queueSizeAttr =
+ (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub);
+ if (queueSizeAttr == null)
+ queueSize = 10000; // Attribute is not present : use the default value
+ else
+ {
+ queueSize = queueSizeAttr.activeIntValue();
+ configAttributes.add(queueSizeAttr);
+ }
+
initialize(changelogServerId, changelogPort);
configDn = config.getDN();
@@ -325,7 +341,7 @@
newSocket = listenSocket.accept();
newSocket.setReceiveBufferSize(1000000);
ServerHandler handler = new ServerHandler(
- new SocketSession(newSocket));
+ new SocketSession(newSocket), queueSize);
handler.start(null, serverId, serverURL, rcvWindow, this);
} catch (IOException e)
{
@@ -401,7 +417,7 @@
socket.connect(ServerAddr, 500);
ServerHandler handler = new ServerHandler(
- new SocketSession(socket));
+ new SocketSession(socket), queueSize);
handler.start(baseDn, serverId, serverURL, rcvWindow, this);
}
catch (IOException e)
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 10663dc..a8785c3 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -85,6 +85,7 @@
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
private int maxSendDelay = 0;
+ private int maxQueueSize = 10000;
private int restartReceiveQueue;
private int restartSendQueue;
private int restartReceiveDelay;
@@ -109,13 +110,16 @@
/**
* Creates a new server handler instance with the provided socket.
*
- * @param session The ProtocolSession used by the ServerHandler to
+ * @param session The ProtocolSession used by the ServerHandler to
* communicate with the remote entity.
+ * @param queueSize The maximum number of update that will be kept
+ * in memory by this ServerHandler.
*/
- public ServerHandler(ProtocolSession session)
+ public ServerHandler(ProtocolSession session, int queueSize)
{
super("Server Handler");
this.session = session;
+ this.maxQueueSize = queueSize;
}
/**
@@ -467,19 +471,50 @@
{
synchronized (msgQueue)
{
- /*
- * TODO : When the server is not able to follow, the msgQueue
- * may become too large and therefore won't contain all the
- * changes. Some changes may only be stored in the backing DB
- * of the servers.
- * The calculation should be done by asking to the each dbHandler
- * how many changes need to be replicated and making the sum
- * For now just return maxint in this case
- */
+ /*
+ * When the server is up to date or close to be up to date,
+ * the number of updates to be sent is the size of the receive queue.
+ */
if (isFollowing())
return msgQueue.size();
else
- return Integer.MAX_VALUE;
+ {
+ /*
+ * When the server is not able to follow, the msgQueue
+ * may become too large and therefore won't contain all the
+ * changes. Some changes may only be stored in the backing DB
+ * of the servers.
+ * The total size of teh receieve queue is calculated by doing
+ * the sum of the number of missing changes for every dbHandler.
+ */
+ int totalCount = 0;
+ ServerState dbState = changelogCache.getDbServerState();
+ for (short id : dbState)
+ {
+ int max = dbState.getMaxChangeNumber(id).getSeqnum();
+ ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
+ if (currentChange != null)
+ {
+ int current = currentChange.getSeqnum();
+ if (current == max)
+ {
+ }
+ else if (current < max)
+ {
+ totalCount += max - current;
+ }
+ else
+ {
+ totalCount += Integer.MAX_VALUE - (current - max) + 1;
+ }
+ }
+ else
+ {
+ totalCount += max;
+ }
+ }
+ return totalCount;
+ }
}
}
@@ -576,7 +611,7 @@
/* TODO : size should be configurable
* and larger than max-receive-queue-size
*/
- while (msgQueue.size() > 10000)
+ while (msgQueue.size() > maxQueueSize)
{
following = false;
msgQueue.removeFirst();
@@ -687,7 +722,7 @@
{
synchronized (msgQueue)
{
- if (msgQueue.size() < 10000)
+ if (msgQueue.size() < maxQueueSize)
{
following = true;
}
@@ -1026,6 +1061,8 @@
baseDn.toString()));
attributes.add(new Attribute("waiting-changes",
String.valueOf(getRcvMsgQueueSize())));
+ attributes.add(new Attribute("max-waiting-changes",
+ String.valueOf(maxQueueSize)));
attributes.add(new Attribute("update-waiting-acks",
String.valueOf(getWaitingAckSize())));
attributes.add(new Attribute("update-sent",
diff --git a/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java b/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
index ec67d61..710f6a8 100644
--- a/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
+++ b/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
@@ -48,11 +48,10 @@
timeStamp = Long.parseLong(temp, 16);
temp = str.substring(16, 20);
- seqnum = Integer.parseInt(temp, 16);
-
- temp = str.substring(20, 24);
serverId = Short.parseShort(temp, 16);
+ temp = str.substring(20, 28);
+ seqnum = Integer.parseInt(temp, 16);
}
/**
@@ -140,7 +139,7 @@
*/
public String toString()
{
- return String.format("%016x%04x%04x", timeStamp, seqnum, serverId);
+ return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
}
/**
diff --git a/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java b/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
index c865276..c3496f9 100644
--- a/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
+++ b/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
@@ -37,21 +37,44 @@
public class ChangeNumberGenerator
{
private long lastTime;
- private int seqnum = 0;
+ private int seqnum;
private short serverId;
/**
* Create a new ChangeNumber Generator.
- * @param id id to use when creating change numbers
- * @param timestamp time to start with
+ * @param id id to use when creating change numbers.
+ * @param timestamp time to start with.
*/
public ChangeNumberGenerator(short id, long timestamp)
{
- lastTime = timestamp;
- serverId = id;
+ this.lastTime = timestamp;
+ this.serverId = id;
+ this.seqnum = 0;
}
/**
+ * Create a new ChangeNumber Generator.
+ *
+ * @param id id to use when creating change numbers.
+ * @param state This generator will be created in a way that makes sure that
+ * all change numbers generated will be larger than all the
+ * changenumbers currently in state.
+ */
+ public ChangeNumberGenerator(short id, ServerState state)
+ {
+ this.lastTime = TimeThread.getTime();
+ for (short stateId : state)
+ {
+ if (this.lastTime < state.getMaxChangeNumber(stateId).getTime())
+ this.lastTime = state.getMaxChangeNumber(stateId).getTime();
+ if (stateId == id)
+ this.seqnum = state.getMaxChangeNumber(id).getSeqnum();
+ }
+ this.serverId = id;
+
+ }
+
+ /**
* Generate a new ChangeNumber.
*
* @return the generated ChangeNUmber
@@ -65,17 +88,12 @@
{
if (curTime > lastTime)
{
- seqnum = 0;
lastTime = curTime;
}
- else
+
+ if (seqnum++ == 0)
{
- seqnum++;
- if (seqnum > 0xFFFF)
- {
- lastTime++;
- seqnum = 0;
- }
+ lastTime++;
}
}
@@ -94,7 +112,6 @@
public void adjust(ChangeNumber number)
{
long rcvdTime = number.getTime();
- int rcvdSeqnum = number.getSeqnum();
/* need to synchronize with NewChangeNumber method so that we
* protect writing of seqnum and lastTime fields
@@ -103,19 +120,8 @@
{
if (lastTime > rcvdTime)
return;
- if (lastTime == rcvdTime)
- {
- if (seqnum < rcvdSeqnum)
- {
- seqnum = rcvdSeqnum;
- }
- return;
- }
- if (lastTime < rcvdTime)
- {
- lastTime = rcvdTime;
- seqnum = rcvdSeqnum;
- }
+ else
+ lastTime = lastTime++;
}
}
}
diff --git a/opends/src/server/org/opends/server/synchronization/common/ServerState.java b/opends/src/server/org/opends/server/synchronization/common/ServerState.java
index e5d741f..a0b8346 100644
--- a/opends/src/server/org/opends/server/synchronization/common/ServerState.java
+++ b/opends/src/server/org/opends/server/synchronization/common/ServerState.java
@@ -31,6 +31,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.zip.DataFormatException;
@@ -44,7 +45,7 @@
* from each server.
* It is exchanged with the changelog servers at connection establishment time.
*/
-public class ServerState
+public class ServerState implements Iterable<Short>
{
private HashMap<Short, ChangeNumber> list;
@@ -281,4 +282,12 @@
return result;
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public Iterator<Short> iterator()
+ {
+ return list.keySet().iterator();
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index a11c2bd..cb6440f 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,7 +27,6 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -308,11 +307,7 @@
monitor = new SynchronizationMonitor(this);
DirectoryServer.registerMonitorProvider(monitor);
- // TODO : read RUV from database an make sure we don't
- // generate changeNumber smaller than ChangeNumbers in the RUV
- long startingChangeNumber = getTime();
- changeNumberGenerator = new ChangeNumberGenerator(serverId,
- startingChangeNumber);
+ changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
/*
* create the broker object used to publish and receive changes
*/
@@ -1213,7 +1208,8 @@
{
synchronized (pendingChanges)
{
- pendingChanges.remove(changeNumber);
+ PendingChange change = pendingChanges.get(changeNumber);
+ change.setCommitted(true);
pushCommittedChanges();
}
}
@@ -1631,7 +1627,8 @@
while ((firstChange != null) && firstChange.isCommitted())
{
- if (firstChange.getOp().isSynchronizationOperation() == false)
+ if ((firstChange.getOp() != null ) &&
+ (firstChange.getOp().isSynchronizationOperation() == false))
{
numSentUpdates++;
broker.publish(firstChange.getMsg());
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
index cb35f38..e9c5f55 100644
--- a/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
@@ -65,10 +65,9 @@
throw new DataFormatException("byte[] is not a valid modify msg");
int pos = 1;
- /* read the changeNumber
- * it is always 24 characters long
- */
- String changenumberStr = new String(in, pos, 24, "UTF-8");
+ /* read the changeNumber */
+ int length = getNextLength(in, pos);
+ String changenumberStr = new String(in, pos, length, "UTF-8");
changeNumber = new ChangeNumber(changenumberStr);
pos +=24;
} catch (UnsupportedEncodingException e)
@@ -95,7 +94,8 @@
{
try
{
- int length = 1 + 24;
+ byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
+ int length = 1 + changeNumberByte.length + 1;
byte[] resultByteArray = new byte[length];
int pos = 1;
@@ -103,14 +103,8 @@
resultByteArray[0] = MSG_TYPE_ACK;
/* put the ChangeNumber */
- byte[] changeNumberByte;
+ pos = addByteArray(changeNumberByte, resultByteArray, pos);
- changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8");
-
- for (int i=0; i<24; i++,pos++)
- {
- resultByteArray[pos] = changeNumberByte[i];
- }
return resultByteArray;
} catch (UnsupportedEncodingException e)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index c19f9a2..d5aff11 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -953,13 +953,13 @@
throws InterruptedException, DirectoryException
{
Entry newEntry = null ;
- int i = timeout/50;
+ int i = timeout/200;
if (i<1)
i=1;
newEntry = DirectoryServer.getEntry(dn);
while ((i> 0) && ((newEntry == null) == exist))
{
- Thread.sleep(50);
+ Thread.sleep(200);
newEntry = DirectoryServer.getEntry(dn);
i--;
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
similarity index 96%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
index fc978db..186343e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
@@ -24,7 +24,7 @@
*
* Portions Copyright 2006 Sun Microsystems, Inc.
*/
-package org.opends.server.changelog;
+package org.opends.server.synchronization.changelog;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
similarity index 98%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
index 11ad842..e753523 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
@@ -24,7 +24,7 @@
*
* Portions Copyright 2006 Sun Microsystems, Inc.
*/
-package org.opends.server.changelog;
+package org.opends.server.synchronization.changelog;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
index 07f1fc5..57da6ac 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
@@ -53,7 +53,7 @@
{TimeThread.getTime(), (short) 123, (short) 45}
};
}
-
+
/**
* Test ChangeNumber constructor
*/
@@ -71,6 +71,21 @@
}
/**
+ * Test toString and constructor from String
+ */
+ @Test(dataProvider = "changeNumberData")
+ public void ChangeNumberEncodeDecode(long time, int seq, short id)
+ throws Exception
+ {
+ // Create 2 ChangeNumber with the same data and check equality
+ ChangeNumber cn = new ChangeNumber(time,seq,id);
+ ChangeNumber cn2 = new ChangeNumber(cn.toString());
+
+ assertEquals(cn, cn2,
+ "The encoding/decoding of ChangeNumber is not reversible");
+ }
+
+ /**
* Create ChangeNumber
*/
@DataProvider(name = "createChangeNumber")
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
index 9fc61f1..ae1c874 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
@@ -94,8 +94,8 @@
UUID uuid = UUID.randomUUID();
// Create the att values list of uuid
- LinkedHashSet<AttributeValue> valuesUuid = new LinkedHashSet<AttributeValue>(
- 1);
+ LinkedHashSet<AttributeValue> valuesUuid =
+ new LinkedHashSet<AttributeValue>(1);
valuesUuid.add(new AttributeValue(Historical.entryuuidAttrType,
new ASN1OctetString(uuid.toString())));
ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1);
@@ -110,23 +110,6 @@
.getOperationalAttributes();
operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
- // Create the att values list of historicalAttr
- String stringVal =
- "ds-sync-hist:00000108b3a6cbb800000001:repl:00000108b3a6cbb800000002";
-
- AttributeValue val = new AttributeValue(Historical.historicalAttrType,
- stringVal);
- LinkedHashSet<AttributeValue> valuesHist =
- new LinkedHashSet<AttributeValue>(1);
- valuesHist.add(val);
- ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
- Attribute histAttr = new Attribute(Historical.historicalAttrType,
- "ds-sync-hist", valuesHist);
- histList.add(histAttr);
-
- //Add the historical att in the entry
- operationalAttributes.put(Historical.historicalAttrType,histList) ;
-
// load historical from the entry
Historical hist = Historical.load(entry);
@@ -283,25 +266,8 @@
operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
- // Create the att values list of historicalAttr
- String stringVal =
- "ds-sync-hist:00000108b3a6cbb800000001:del:00000108b3a6cbb800000002";
-
- AttributeValue val = new AttributeValue(Historical.historicalAttrType,
- stringVal);
- LinkedHashSet<AttributeValue> valuesHist =
- new LinkedHashSet<AttributeValue>(1);
- valuesHist.add(val);
- ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
- Attribute histAttr = new Attribute(Historical.historicalAttrType,
- "ds-sync-hist", valuesHist);
- histList.add(0, histAttr);
-
- //Add the historical att in the entry
- entry.putAttribute(Historical.historicalAttrType,histList) ;
// load historical from the entry
-
Historical hist = Historical.load(entry);
/*
@@ -377,25 +343,8 @@
.getOperationalAttributes();
operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
- // Create the att values list of historicalAttr
- String stringVal =
- "ds-sync-hist:00000108b3a6cbb800000001:add:00000108b3a6cbb800000002";
-
- AttributeValue val = new AttributeValue(Historical.historicalAttrType,
- stringVal);
- LinkedHashSet<AttributeValue> valuesHist =
- new LinkedHashSet<AttributeValue>(1);
- valuesHist.add(val);
- ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
- Attribute histAttr = new Attribute(Historical.historicalAttrType,
- "ds-sync-hist", valuesHist);
- histList.add(histAttr);
-
- //Add the historycal att in the entry
- entry.putAttribute(Historical.historicalAttrType,histList) ;
-
+
// load historical from the entry
-
Historical hist = Historical.load(entry);
/*
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
index ce766ae..ec9c1ab 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
@@ -49,7 +49,6 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.protocols.ldap.LDAPFilter;
-import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -78,6 +77,7 @@
public class ProtocolWindowTest
{
private static final int WINDOW_SIZE = 10;
+ private static final int CHANGELOG_QUEUE_SIZE = 100;
private static final String SYNCHRONIZATION_STRESS_TEST =
"Synchronization Stress Test";
@@ -146,7 +146,7 @@
* the client receives the correct number of operations.
*/
@Test(enabled=true, groups="slow")
- public void saturateAndRestart() throws Exception
+ public void saturateQueueAndRestart() throws Exception
{
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
@@ -166,6 +166,7 @@
*/
Thread.sleep(1500);
assertTrue(checkWindows(WINDOW_SIZE));
+ assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE));
// Create an Entry (add operation) that will be later used in the test.
Entry tmp = personEntry.duplicate();
@@ -192,8 +193,9 @@
assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
"The received ADD synchronization message is not for the excepted DN");
- // send twice the window modify operations
- int count = WINDOW_SIZE * 2;
+ // send (2 * window + changelog queue) modify operations
+ // so that window + changelog queue get stuck in the changelog queue
+ int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
processModify(count);
// let some time to the message to reach the changelog client
@@ -216,7 +218,7 @@
/*
* check that we received all updates
*/
- assertEquals(rcvCount, WINDOW_SIZE*2);
+ assertEquals(rcvCount, count);
}
finally {
broker.stop();
@@ -225,6 +227,22 @@
}
/**
+ * Check that the Changelog queue size has correctly been configured
+ * by reading the monitoring information.
+ * @throws LDAPException
+ */
+ private boolean checkChangelogQueueSize(int changelog_queue_size)
+ throws LDAPException
+ {
+ InternalSearchOperation op = connection.processSearch(
+ new ASN1OctetString("cn=monitor"),
+ SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(
+ "(max-waiting-changes=" + changelog_queue_size + ")"));
+ assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+ return (op.getEntriesSent() == 2);
+ }
+
+ /**
* Check that the window configuration has been successfull
* by reading the monitoring information and checking
* that we do have 2 entries with the configured max-rcv-window.
@@ -238,10 +256,11 @@
assertEquals(op.getResultCode(), ResultCode.SUCCESS);
return (op.getEntriesSent() == 3);
}
-
+
/**
- * Search that the changelog has stopped sending changes after
+ * Search that the changelog has stopped sending changes after
* having reach the limit of the window size.
+ * And that the number of waiting changes is accurate.
* Do this by checking the monitoring information.
*/
private boolean searchUpdateSent() throws Exception
@@ -251,6 +270,16 @@
SearchScope.WHOLE_SUBTREE,
LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+ if (op.getEntriesSent() != 1)
+ return false;
+
+ op = connection.processSearch(
+ new ASN1OctetString("cn=monitor"),
+ SearchScope.WHOLE_SUBTREE,
+ LDAPFilter.decode("(waiting-changes=" +
+ (CHANGELOG_QUEUE_SIZE + WINDOW_SIZE) + ")"));
+ assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+
return (op.getEntriesSent() == 1);
}
@@ -316,7 +345,8 @@
+ "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
+ "ds-cfg-changelog-server-id: 1\n"
- + "ds-cfg-window-size: " + WINDOW_SIZE;
+ + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
+ + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
// suffix synchronized
--
Gitblit v1.10.0