The synchronization changelog monitoring information has a counter named
waiting-changes that publish the number of updates known by the changelog
server that have not yest been sent to each ldap server because they are too slow to replay them.
This is calculated from the list of changes in memory when this list is small
enough. However when this list becomes too large it is not possible to keep
all the changes in memory and the changes therefore goes to disk.
This monitoring information therefore becomes unavailable and currently returns MAXINT.
This is unfortunate because this is the time when it is the most important.
Unfortunately the Java edition of the berkeley DB does not provide a way to
know the record number of a given db entry and therefore we can't rely on the DB
to tell us how many changes stay in the db after the current change.
This fix therefore change the format of the ChangeNumber so that the ChangeNumber
now uses consecutive numbers for its seqnum part.
The number of missing changes can therefore be calculated by a simple substraction
of the current seqnum and the largest seqnum.
the drawback of this method is that it is only accurate if the majority of update
operations done on the masters are successfull because this method also count the failed
operations.
This fix also modify the ProtocolWindowTest in order to add tests of this waiting-changes
monitoring information.
This fix also makes the size of the memory queue of messages configurable.
2 files renamed
13 files modified
| | |
| | | objectClass: top |
| | | objectClass: ds-cfg-synchronization-provider |
| | | ds-cfg-synchronization-provider-enabled: true |
| | | ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization |
| | | ds-cfg-synchronization-provider-class: org.opends.server.synchronization.plugin.MultimasterSynchronization |
| | | |
| | | dn: cn=example, cn=Multimaster Synchronization,cn=Synchronization Providers,cn=config |
| | | objectClass: top |
| | |
| | | NAME 'ds-cfg-changelog-port' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.290 |
| | | NAME 'ds-cfg-changelog-max-queue-size' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 |
| | | SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.278 |
| | | NAME 'ds-cfg-changelog-server-id' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 |
| | |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME |
| | | 'ds-cfg-synchronization-changelog-server-config' SUP top |
| | | STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port ) |
| | | MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $ |
| | | ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory' |
| | | SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | |
| | | new ArrayList<ConfigAttribute>(); |
| | | private ChangelogDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | | |
| | | static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id"; |
| | | static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; |
| | | static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; |
| | | static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size"; |
| | | |
| | | static final IntegerConfigAttribute changelogPortStub = |
| | | new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port", |
| | |
| | | new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | static final IntegerConfigAttribute queueSizeStub = |
| | | new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | /** |
| | | * Check if a ConfigEntry is valid. |
| | | * @param config The config entry that needs to be checked. |
| | |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | IntegerConfigAttribute queueSizeAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub); |
| | | if (queueSizeAttr == null) |
| | | queueSize = 10000; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | queueSize = queueSizeAttr.activeIntValue(); |
| | | configAttributes.add(queueSizeAttr); |
| | | } |
| | | |
| | | initialize(changelogServerId, changelogPort); |
| | | |
| | | configDn = config.getDN(); |
| | |
| | | newSocket = listenSocket.accept(); |
| | | newSocket.setReceiveBufferSize(1000000); |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(newSocket)); |
| | | new SocketSession(newSocket), queueSize); |
| | | handler.start(null, serverId, serverURL, rcvWindow, this); |
| | | } catch (IOException e) |
| | | { |
| | |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(socket)); |
| | | new SocketSession(socket), queueSize); |
| | | handler.start(baseDn, serverId, serverURL, rcvWindow, this); |
| | | } |
| | | catch (IOException e) |
| | |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private int maxQueueSize = 10000; |
| | | private int restartReceiveQueue; |
| | | private int restartSendQueue; |
| | | private int restartReceiveDelay; |
| | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | | * |
| | | * @param session The ProtocolSession used by the ServerHandler to |
| | | * @param session The ProtocolSession used by the ServerHandler to |
| | | * communicate with the remote entity. |
| | | * @param queueSize The maximum number of update that will be kept |
| | | * in memory by this ServerHandler. |
| | | */ |
| | | public ServerHandler(ProtocolSession session) |
| | | public ServerHandler(ProtocolSession session, int queueSize) |
| | | { |
| | | super("Server Handler"); |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * TODO : When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The calculation should be done by asking to the each dbHandler |
| | | * how many changes need to be replicated and making the sum |
| | | * For now just return maxint in this case |
| | | */ |
| | | /* |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.size(); |
| | | else |
| | | return Integer.MAX_VALUE; |
| | | { |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of teh receieve queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = changelogCache.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | int max = dbState.getMaxChangeNumber(id).getSeqnum(); |
| | | ChangeNumber currentChange = serverState.getMaxChangeNumber(id); |
| | | if (currentChange != null) |
| | | { |
| | | int current = currentChange.getSeqnum(); |
| | | if (current == max) |
| | | { |
| | | } |
| | | else if (current < max) |
| | | { |
| | | totalCount += max - current; |
| | | } |
| | | else |
| | | { |
| | | totalCount += Integer.MAX_VALUE - (current - max) + 1; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | totalCount += max; |
| | | } |
| | | } |
| | | return totalCount; |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | while (msgQueue.size() > 10000) |
| | | while (msgQueue.size() > maxQueueSize) |
| | | { |
| | | following = false; |
| | | msgQueue.removeFirst(); |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.size() < 10000) |
| | | if (msgQueue.size() < maxQueueSize) |
| | | { |
| | | following = true; |
| | | } |
| | |
| | | baseDn.toString())); |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | attributes.add(new Attribute("max-waiting-changes", |
| | | String.valueOf(maxQueueSize))); |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | | String.valueOf(getWaitingAckSize()))); |
| | | attributes.add(new Attribute("update-sent", |
| | |
| | | timeStamp = Long.parseLong(temp, 16); |
| | | |
| | | temp = str.substring(16, 20); |
| | | seqnum = Integer.parseInt(temp, 16); |
| | | |
| | | temp = str.substring(20, 24); |
| | | serverId = Short.parseShort(temp, 16); |
| | | |
| | | temp = str.substring(20, 28); |
| | | seqnum = Integer.parseInt(temp, 16); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public String toString() |
| | | { |
| | | return String.format("%016x%04x%04x", timeStamp, seqnum, serverId); |
| | | return String.format("%016x%04x%08x", timeStamp, serverId, seqnum); |
| | | } |
| | | |
| | | /** |
| | |
| | | public class ChangeNumberGenerator |
| | | { |
| | | private long lastTime; |
| | | private int seqnum = 0; |
| | | private int seqnum; |
| | | private short serverId; |
| | | |
| | | /** |
| | | * Create a new ChangeNumber Generator. |
| | | * @param id id to use when creating change numbers |
| | | * @param timestamp time to start with |
| | | * @param id id to use when creating change numbers. |
| | | * @param timestamp time to start with. |
| | | */ |
| | | public ChangeNumberGenerator(short id, long timestamp) |
| | | { |
| | | lastTime = timestamp; |
| | | serverId = id; |
| | | this.lastTime = timestamp; |
| | | this.serverId = id; |
| | | this.seqnum = 0; |
| | | } |
| | | |
| | | /** |
| | | * Create a new ChangeNumber Generator. |
| | | * |
| | | * @param id id to use when creating change numbers. |
| | | * @param state This generator will be created in a way that makes sure that |
| | | * all change numbers generated will be larger than all the |
| | | * changenumbers currently in state. |
| | | */ |
| | | public ChangeNumberGenerator(short id, ServerState state) |
| | | { |
| | | this.lastTime = TimeThread.getTime(); |
| | | for (short stateId : state) |
| | | { |
| | | if (this.lastTime < state.getMaxChangeNumber(stateId).getTime()) |
| | | this.lastTime = state.getMaxChangeNumber(stateId).getTime(); |
| | | if (stateId == id) |
| | | this.seqnum = state.getMaxChangeNumber(id).getSeqnum(); |
| | | } |
| | | this.serverId = id; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Generate a new ChangeNumber. |
| | | * |
| | | * @return the generated ChangeNUmber |
| | |
| | | { |
| | | if (curTime > lastTime) |
| | | { |
| | | seqnum = 0; |
| | | lastTime = curTime; |
| | | } |
| | | else |
| | | |
| | | if (seqnum++ == 0) |
| | | { |
| | | seqnum++; |
| | | if (seqnum > 0xFFFF) |
| | | { |
| | | lastTime++; |
| | | seqnum = 0; |
| | | } |
| | | lastTime++; |
| | | } |
| | | } |
| | | |
| | |
| | | public void adjust(ChangeNumber number) |
| | | { |
| | | long rcvdTime = number.getTime(); |
| | | int rcvdSeqnum = number.getSeqnum(); |
| | | |
| | | /* need to synchronize with NewChangeNumber method so that we |
| | | * protect writing of seqnum and lastTime fields |
| | |
| | | { |
| | | if (lastTime > rcvdTime) |
| | | return; |
| | | if (lastTime == rcvdTime) |
| | | { |
| | | if (seqnum < rcvdSeqnum) |
| | | { |
| | | seqnum = rcvdSeqnum; |
| | | } |
| | | return; |
| | | } |
| | | if (lastTime < rcvdTime) |
| | | { |
| | | lastTime = rcvdTime; |
| | | seqnum = rcvdSeqnum; |
| | | } |
| | | else |
| | | lastTime = lastTime++; |
| | | } |
| | | } |
| | | } |
| | |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.zip.DataFormatException; |
| | |
| | | * from each server. |
| | | * It is exchanged with the changelog servers at connection establishment time. |
| | | */ |
| | | public class ServerState |
| | | public class ServerState implements Iterable<Short> |
| | | { |
| | | private HashMap<Short, ChangeNumber> list; |
| | | |
| | |
| | | return result; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Iterator<Short> iterator() |
| | | { |
| | | return list.keySet().iterator(); |
| | | } |
| | | } |
| | |
| | | package org.opends.server.synchronization.plugin; |
| | | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.util.TimeThread.getTime; |
| | | import static org.opends.server.synchronization.common.LogMessages.*; |
| | | import static org.opends.server.synchronization.plugin.Historical.*; |
| | | import static org.opends.server.synchronization.protocol.OperationContext.*; |
| | |
| | | monitor = new SynchronizationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | // TODO : read RUV from database an make sure we don't |
| | | // generate changeNumber smaller than ChangeNumbers in the RUV |
| | | long startingChangeNumber = getTime(); |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, |
| | | startingChangeNumber); |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, state); |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(changeNumber); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | change.setCommitted(true); |
| | | pushCommittedChanges(); |
| | | } |
| | | } |
| | |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if (firstChange.getOp().isSynchronizationOperation() == false) |
| | | if ((firstChange.getOp() != null ) && |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | |
| | | throw new DataFormatException("byte[] is not a valid modify msg"); |
| | | int pos = 1; |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | /* read the changeNumber */ |
| | | int length = getNextLength(in, pos); |
| | | String changenumberStr = new String(in, pos, length, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | } catch (UnsupportedEncodingException e) |
| | |
| | | { |
| | | try |
| | | { |
| | | int length = 1 + 24; |
| | | byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); |
| | | int length = 1 + changeNumberByte.length + 1; |
| | | byte[] resultByteArray = new byte[length]; |
| | | int pos = 1; |
| | | |
| | |
| | | resultByteArray[0] = MSG_TYPE_ACK; |
| | | |
| | | /* put the ChangeNumber */ |
| | | byte[] changeNumberByte; |
| | | pos = addByteArray(changeNumberByte, resultByteArray, pos); |
| | | |
| | | changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); |
| | | |
| | | for (int i=0; i<24; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = changeNumberByte[i]; |
| | | } |
| | | return resultByteArray; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | throws InterruptedException, DirectoryException |
| | | { |
| | | Entry newEntry = null ; |
| | | int i = timeout/50; |
| | | int i = timeout/200; |
| | | if (i<1) |
| | | i=1; |
| | | newEntry = DirectoryServer.getEntry(dn); |
| | | while ((i> 0) && ((newEntry == null) == exist)) |
| | | { |
| | | Thread.sleep(50); |
| | | Thread.sleep(200); |
| | | newEntry = DirectoryServer.getEntry(dn); |
| | | i--; |
| | | } |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java |
| | |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java |
| | |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | |
| | | {TimeThread.getTime(), (short) 123, (short) 45} |
| | | }; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Test ChangeNumber constructor |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Test toString and constructor from String |
| | | */ |
| | | @Test(dataProvider = "changeNumberData") |
| | | public void ChangeNumberEncodeDecode(long time, int seq, short id) |
| | | throws Exception |
| | | { |
| | | // Create 2 ChangeNumber with the same data and check equality |
| | | ChangeNumber cn = new ChangeNumber(time,seq,id); |
| | | ChangeNumber cn2 = new ChangeNumber(cn.toString()); |
| | | |
| | | assertEquals(cn, cn2, |
| | | "The encoding/decoding of ChangeNumber is not reversible"); |
| | | } |
| | | |
| | | /** |
| | | * Create ChangeNumber |
| | | */ |
| | | @DataProvider(name = "createChangeNumber") |
| | |
| | | UUID uuid = UUID.randomUUID(); |
| | | |
| | | // Create the att values list of uuid |
| | | LinkedHashSet<AttributeValue> valuesUuid = new LinkedHashSet<AttributeValue>( |
| | | 1); |
| | | LinkedHashSet<AttributeValue> valuesUuid = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesUuid.add(new AttributeValue(Historical.entryuuidAttrType, |
| | | new ASN1OctetString(uuid.toString()))); |
| | | ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1); |
| | |
| | | .getOperationalAttributes(); |
| | | operationalAttributes.put(Historical.entryuuidAttrType, uuidList); |
| | | |
| | | // Create the att values list of historicalAttr |
| | | String stringVal = |
| | | "ds-sync-hist:00000108b3a6cbb800000001:repl:00000108b3a6cbb800000002"; |
| | | |
| | | AttributeValue val = new AttributeValue(Historical.historicalAttrType, |
| | | stringVal); |
| | | LinkedHashSet<AttributeValue> valuesHist = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesHist.add(val); |
| | | ArrayList<Attribute> histList = new ArrayList<Attribute>(1); |
| | | Attribute histAttr = new Attribute(Historical.historicalAttrType, |
| | | "ds-sync-hist", valuesHist); |
| | | histList.add(histAttr); |
| | | |
| | | //Add the historical att in the entry |
| | | operationalAttributes.put(Historical.historicalAttrType,histList) ; |
| | | |
| | | // load historical from the entry |
| | | Historical hist = Historical.load(entry); |
| | | |
| | |
| | | |
| | | operationalAttributes.put(Historical.entryuuidAttrType, uuidList); |
| | | |
| | | // Create the att values list of historicalAttr |
| | | String stringVal = |
| | | "ds-sync-hist:00000108b3a6cbb800000001:del:00000108b3a6cbb800000002"; |
| | | |
| | | AttributeValue val = new AttributeValue(Historical.historicalAttrType, |
| | | stringVal); |
| | | LinkedHashSet<AttributeValue> valuesHist = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesHist.add(val); |
| | | ArrayList<Attribute> histList = new ArrayList<Attribute>(1); |
| | | Attribute histAttr = new Attribute(Historical.historicalAttrType, |
| | | "ds-sync-hist", valuesHist); |
| | | histList.add(0, histAttr); |
| | | |
| | | //Add the historical att in the entry |
| | | entry.putAttribute(Historical.historicalAttrType,histList) ; |
| | | |
| | | // load historical from the entry |
| | | |
| | | Historical hist = Historical.load(entry); |
| | | |
| | | /* |
| | |
| | | .getOperationalAttributes(); |
| | | |
| | | operationalAttributes.put(Historical.entryuuidAttrType, uuidList); |
| | | // Create the att values list of historicalAttr |
| | | String stringVal = |
| | | "ds-sync-hist:00000108b3a6cbb800000001:add:00000108b3a6cbb800000002"; |
| | | |
| | | AttributeValue val = new AttributeValue(Historical.historicalAttrType, |
| | | stringVal); |
| | | LinkedHashSet<AttributeValue> valuesHist = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesHist.add(val); |
| | | ArrayList<Attribute> histList = new ArrayList<Attribute>(1); |
| | | Attribute histAttr = new Attribute(Historical.historicalAttrType, |
| | | "ds-sync-hist", valuesHist); |
| | | histList.add(histAttr); |
| | | |
| | | //Add the historycal att in the entry |
| | | entry.putAttribute(Historical.historicalAttrType,histList) ; |
| | | |
| | | |
| | | // load historical from the entry |
| | | |
| | | Historical hist = Historical.load(entry); |
| | | |
| | | /* |
| | |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPException; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.synchronization.common.ServerState; |
| | | import org.opends.server.synchronization.plugin.ChangelogBroker; |
| | | import org.opends.server.synchronization.plugin.MultimasterSynchronization; |
| | | import org.opends.server.synchronization.plugin.PersistentServerState; |
| | |
| | | public class ProtocolWindowTest |
| | | { |
| | | private static final int WINDOW_SIZE = 10; |
| | | private static final int CHANGELOG_QUEUE_SIZE = 100; |
| | | |
| | | private static final String SYNCHRONIZATION_STRESS_TEST = |
| | | "Synchronization Stress Test"; |
| | |
| | | * the client receives the correct number of operations. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | | public void saturateAndRestart() throws Exception |
| | | public void saturateQueueAndRestart() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | |
| | | */ |
| | | Thread.sleep(1500); |
| | | assertTrue(checkWindows(WINDOW_SIZE)); |
| | | assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE)); |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | // send twice the window modify operations |
| | | int count = WINDOW_SIZE * 2; |
| | | // send (2 * window + changelog queue) modify operations |
| | | // so that window + changelog queue get stuck in the changelog queue |
| | | int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE; |
| | | processModify(count); |
| | | |
| | | // let some time to the message to reach the changelog client |
| | |
| | | /* |
| | | * check that we received all updates |
| | | */ |
| | | assertEquals(rcvCount, WINDOW_SIZE*2); |
| | | assertEquals(rcvCount, count); |
| | | } |
| | | finally { |
| | | broker.stop(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check that the Changelog queue size has correctly been configured |
| | | * by reading the monitoring information. |
| | | * @throws LDAPException |
| | | */ |
| | | private boolean checkChangelogQueueSize(int changelog_queue_size) |
| | | throws LDAPException |
| | | { |
| | | InternalSearchOperation op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, LDAPFilter.decode( |
| | | "(max-waiting-changes=" + changelog_queue_size + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 2); |
| | | } |
| | | |
| | | /** |
| | | * Check that the window configuration has been successfull |
| | | * by reading the monitoring information and checking |
| | | * that we do have 2 entries with the configured max-rcv-window. |
| | |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 3); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Search that the changelog has stopped sending changes after |
| | | * Search that the changelog has stopped sending changes after |
| | | * having reach the limit of the window size. |
| | | * And that the number of waiting changes is accurate. |
| | | * Do this by checking the monitoring information. |
| | | */ |
| | | private boolean searchUpdateSent() throws Exception |
| | |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | if (op.getEntriesSent() != 1) |
| | | return false; |
| | | |
| | | op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(waiting-changes=" + |
| | | (CHANGELOG_QUEUE_SIZE + WINDOW_SIZE) + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | |
| | | return (op.getEntriesSent() == 1); |
| | | } |
| | | |
| | |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n" |
| | | + "ds-cfg-changelog-server-id: 1\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" |
| | | + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; |
| | | changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); |
| | | |
| | | // suffix synchronized |