| | |
| | | objectClass: top |
| | | objectClass: ds-cfg-synchronization-provider |
| | | ds-cfg-synchronization-provider-enabled: true |
| | | ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization |
| | | ds-cfg-synchronization-provider-class: org.opends.server.synchronization.plugin.MultimasterSynchronization |
| | | |
| | | dn: cn=example, cn=Multimaster Synchronization,cn=Synchronization Providers,cn=config |
| | | objectClass: top |
| | |
| | | NAME 'ds-cfg-changelog-port' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.290 |
| | | NAME 'ds-cfg-changelog-max-queue-size' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 |
| | | SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.278 |
| | | NAME 'ds-cfg-changelog-server-id' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 |
| | |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME |
| | | 'ds-cfg-synchronization-changelog-server-config' SUP top |
| | | STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port ) |
| | | MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $ |
| | | ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory' |
| | | SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | |
| | | new ArrayList<ConfigAttribute>(); |
| | | private ChangelogDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | | |
| | | static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id"; |
| | | static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; |
| | | static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; |
| | | static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size"; |
| | | |
| | | static final IntegerConfigAttribute changelogPortStub = |
| | | new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port", |
| | |
| | | new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | static final IntegerConfigAttribute queueSizeStub = |
| | | new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | /** |
| | | * Check if a ConfigEntry is valid. |
| | | * @param config The config entry that needs to be checked. |
| | |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | IntegerConfigAttribute queueSizeAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub); |
| | | if (queueSizeAttr == null) |
| | | queueSize = 10000; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | queueSize = queueSizeAttr.activeIntValue(); |
| | | configAttributes.add(queueSizeAttr); |
| | | } |
| | | |
| | | initialize(changelogServerId, changelogPort); |
| | | |
| | | configDn = config.getDN(); |
| | |
| | | newSocket = listenSocket.accept(); |
| | | newSocket.setReceiveBufferSize(1000000); |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(newSocket)); |
| | | new SocketSession(newSocket), queueSize); |
| | | handler.start(null, serverId, serverURL, rcvWindow, this); |
| | | } catch (IOException e) |
| | | { |
| | |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(socket)); |
| | | new SocketSession(socket), queueSize); |
| | | handler.start(baseDn, serverId, serverURL, rcvWindow, this); |
| | | } |
| | | catch (IOException e) |
| | |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private int maxQueueSize = 10000; |
| | | private int restartReceiveQueue; |
| | | private int restartSendQueue; |
| | | private int restartReceiveDelay; |
| | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | | * |
| | | * @param session The ProtocolSession used by the ServerHandler to |
| | | * @param session The ProtocolSession used by the ServerHandler to |
| | | * communicate with the remote entity. |
| | | * @param queueSize The maximum number of update that will be kept |
| | | * in memory by this ServerHandler. |
| | | */ |
| | | public ServerHandler(ProtocolSession session) |
| | | public ServerHandler(ProtocolSession session, int queueSize) |
| | | { |
| | | super("Server Handler"); |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * TODO : When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The calculation should be done by asking to the each dbHandler |
| | | * how many changes need to be replicated and making the sum |
| | | * For now just return maxint in this case |
| | | */ |
| | | /* |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.size(); |
| | | else |
| | | return Integer.MAX_VALUE; |
| | | { |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of teh receieve queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = changelogCache.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | int max = dbState.getMaxChangeNumber(id).getSeqnum(); |
| | | ChangeNumber currentChange = serverState.getMaxChangeNumber(id); |
| | | if (currentChange != null) |
| | | { |
| | | int current = currentChange.getSeqnum(); |
| | | if (current == max) |
| | | { |
| | | } |
| | | else if (current < max) |
| | | { |
| | | totalCount += max - current; |
| | | } |
| | | else |
| | | { |
| | | totalCount += Integer.MAX_VALUE - (current - max) + 1; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | totalCount += max; |
| | | } |
| | | } |
| | | return totalCount; |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | while (msgQueue.size() > 10000) |
| | | while (msgQueue.size() > maxQueueSize) |
| | | { |
| | | following = false; |
| | | msgQueue.removeFirst(); |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.size() < 10000) |
| | | if (msgQueue.size() < maxQueueSize) |
| | | { |
| | | following = true; |
| | | } |
| | |
| | | baseDn.toString())); |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | attributes.add(new Attribute("max-waiting-changes", |
| | | String.valueOf(maxQueueSize))); |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | | String.valueOf(getWaitingAckSize()))); |
| | | attributes.add(new Attribute("update-sent", |
| | |
| | | timeStamp = Long.parseLong(temp, 16); |
| | | |
| | | temp = str.substring(16, 20); |
| | | seqnum = Integer.parseInt(temp, 16); |
| | | |
| | | temp = str.substring(20, 24); |
| | | serverId = Short.parseShort(temp, 16); |
| | | |
| | | temp = str.substring(20, 28); |
| | | seqnum = Integer.parseInt(temp, 16); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public String toString() |
| | | { |
| | | return String.format("%016x%04x%04x", timeStamp, seqnum, serverId); |
| | | return String.format("%016x%04x%08x", timeStamp, serverId, seqnum); |
| | | } |
| | | |
| | | /** |
| | |
| | | public class ChangeNumberGenerator |
| | | { |
| | | private long lastTime; |
| | | private int seqnum = 0; |
| | | private int seqnum; |
| | | private short serverId; |
| | | |
| | | /** |
| | | * Create a new ChangeNumber Generator. |
| | | * @param id id to use when creating change numbers |
| | | * @param timestamp time to start with |
| | | * @param id id to use when creating change numbers. |
| | | * @param timestamp time to start with. |
| | | */ |
| | | public ChangeNumberGenerator(short id, long timestamp) |
| | | { |
| | | lastTime = timestamp; |
| | | serverId = id; |
| | | this.lastTime = timestamp; |
| | | this.serverId = id; |
| | | this.seqnum = 0; |
| | | } |
| | | |
| | | /** |
| | | * Create a new ChangeNumber Generator. |
| | | * |
| | | * @param id id to use when creating change numbers. |
| | | * @param state This generator will be created in a way that makes sure that |
| | | * all change numbers generated will be larger than all the |
| | | * changenumbers currently in state. |
| | | */ |
| | | public ChangeNumberGenerator(short id, ServerState state) |
| | | { |
| | | this.lastTime = TimeThread.getTime(); |
| | | for (short stateId : state) |
| | | { |
| | | if (this.lastTime < state.getMaxChangeNumber(stateId).getTime()) |
| | | this.lastTime = state.getMaxChangeNumber(stateId).getTime(); |
| | | if (stateId == id) |
| | | this.seqnum = state.getMaxChangeNumber(id).getSeqnum(); |
| | | } |
| | | this.serverId = id; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Generate a new ChangeNumber. |
| | | * |
| | | * @return the generated ChangeNUmber |
| | |
| | | { |
| | | if (curTime > lastTime) |
| | | { |
| | | seqnum = 0; |
| | | lastTime = curTime; |
| | | } |
| | | else |
| | | |
| | | if (seqnum++ == 0) |
| | | { |
| | | seqnum++; |
| | | if (seqnum > 0xFFFF) |
| | | { |
| | | lastTime++; |
| | | seqnum = 0; |
| | | } |
| | | lastTime++; |
| | | } |
| | | } |
| | | |
| | |
| | | public void adjust(ChangeNumber number) |
| | | { |
| | | long rcvdTime = number.getTime(); |
| | | int rcvdSeqnum = number.getSeqnum(); |
| | | |
| | | /* need to synchronize with NewChangeNumber method so that we |
| | | * protect writing of seqnum and lastTime fields |
| | |
| | | { |
| | | if (lastTime > rcvdTime) |
| | | return; |
| | | if (lastTime == rcvdTime) |
| | | { |
| | | if (seqnum < rcvdSeqnum) |
| | | { |
| | | seqnum = rcvdSeqnum; |
| | | } |
| | | return; |
| | | } |
| | | if (lastTime < rcvdTime) |
| | | { |
| | | lastTime = rcvdTime; |
| | | seqnum = rcvdSeqnum; |
| | | } |
| | | else |
| | | lastTime = lastTime++; |
| | | } |
| | | } |
| | | } |
| | |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.zip.DataFormatException; |
| | |
| | | * from each server. |
| | | * It is exchanged with the changelog servers at connection establishment time. |
| | | */ |
| | | public class ServerState |
| | | public class ServerState implements Iterable<Short> |
| | | { |
| | | private HashMap<Short, ChangeNumber> list; |
| | | |
| | |
| | | return result; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Iterator<Short> iterator() |
| | | { |
| | | return list.keySet().iterator(); |
| | | } |
| | | } |
| | |
| | | package org.opends.server.synchronization.plugin; |
| | | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.util.TimeThread.getTime; |
| | | import static org.opends.server.synchronization.common.LogMessages.*; |
| | | import static org.opends.server.synchronization.plugin.Historical.*; |
| | | import static org.opends.server.synchronization.protocol.OperationContext.*; |
| | |
| | | monitor = new SynchronizationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | // TODO : read RUV from database an make sure we don't |
| | | // generate changeNumber smaller than ChangeNumbers in the RUV |
| | | long startingChangeNumber = getTime(); |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, |
| | | startingChangeNumber); |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, state); |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(changeNumber); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | change.setCommitted(true); |
| | | pushCommittedChanges(); |
| | | } |
| | | } |
| | |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if (firstChange.getOp().isSynchronizationOperation() == false) |
| | | if ((firstChange.getOp() != null ) && |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | |
| | | throw new DataFormatException("byte[] is not a valid modify msg"); |
| | | int pos = 1; |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | /* read the changeNumber */ |
| | | int length = getNextLength(in, pos); |
| | | String changenumberStr = new String(in, pos, length, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | } catch (UnsupportedEncodingException e) |
| | |
| | | { |
| | | try |
| | | { |
| | | int length = 1 + 24; |
| | | byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); |
| | | int length = 1 + changeNumberByte.length + 1; |
| | | byte[] resultByteArray = new byte[length]; |
| | | int pos = 1; |
| | | |
| | |
| | | resultByteArray[0] = MSG_TYPE_ACK; |
| | | |
| | | /* put the ChangeNumber */ |
| | | byte[] changeNumberByte; |
| | | pos = addByteArray(changeNumberByte, resultByteArray, pos); |
| | | |
| | | changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); |
| | | |
| | | for (int i=0; i<24; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = changeNumberByte[i]; |
| | | } |
| | | return resultByteArray; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | throws InterruptedException, DirectoryException |
| | | { |
| | | Entry newEntry = null ; |
| | | int i = timeout/50; |
| | | int i = timeout/200; |
| | | if (i<1) |
| | | i=1; |
| | | newEntry = DirectoryServer.getEntry(dn); |
| | | while ((i> 0) && ((newEntry == null) == exist)) |
| | | { |
| | | Thread.sleep(50); |
| | | Thread.sleep(200); |
| | | newEntry = DirectoryServer.getEntry(dn); |
| | | i--; |
| | | } |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java |
| | |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java |
| | |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | |
| | | {TimeThread.getTime(), (short) 123, (short) 45} |
| | | }; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Test ChangeNumber constructor |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Test toString and constructor from String |
| | | */ |
| | | @Test(dataProvider = "changeNumberData") |
| | | public void ChangeNumberEncodeDecode(long time, int seq, short id) |
| | | throws Exception |
| | | { |
| | | // Create 2 ChangeNumber with the same data and check equality |
| | | ChangeNumber cn = new ChangeNumber(time,seq,id); |
| | | ChangeNumber cn2 = new ChangeNumber(cn.toString()); |
| | | |
| | | assertEquals(cn, cn2, |
| | | "The encoding/decoding of ChangeNumber is not reversible"); |
| | | } |
| | | |
| | | /** |
| | | * Create ChangeNumber |
| | | */ |
| | | @DataProvider(name = "createChangeNumber") |
| | |
| | | UUID uuid = UUID.randomUUID(); |
| | | |
| | | // Create the att values list of uuid |
| | | LinkedHashSet<AttributeValue> valuesUuid = new LinkedHashSet<AttributeValue>( |
| | | 1); |
| | | LinkedHashSet<AttributeValue> valuesUuid = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesUuid.add(new AttributeValue(Historical.entryuuidAttrType, |
| | | new ASN1OctetString(uuid.toString()))); |
| | | ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1); |
| | |
| | | .getOperationalAttributes(); |
| | | operationalAttributes.put(Historical.entryuuidAttrType, uuidList); |
| | | |
| | | // Create the att values list of historicalAttr |
| | | String stringVal = |
| | | "ds-sync-hist:00000108b3a6cbb800000001:repl:00000108b3a6cbb800000002"; |
| | | |
| | | AttributeValue val = new AttributeValue(Historical.historicalAttrType, |
| | | stringVal); |
| | | LinkedHashSet<AttributeValue> valuesHist = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesHist.add(val); |
| | | ArrayList<Attribute> histList = new ArrayList<Attribute>(1); |
| | | Attribute histAttr = new Attribute(Historical.historicalAttrType, |
| | | "ds-sync-hist", valuesHist); |
| | | histList.add(histAttr); |
| | | |
| | | //Add the historical att in the entry |
| | | operationalAttributes.put(Historical.historicalAttrType,histList) ; |
| | | |
| | | // load historical from the entry |
| | | Historical hist = Historical.load(entry); |
| | | |
| | |
| | | |
| | | operationalAttributes.put(Historical.entryuuidAttrType, uuidList); |
| | | |
| | | // Create the att values list of historicalAttr |
| | | String stringVal = |
| | | "ds-sync-hist:00000108b3a6cbb800000001:del:00000108b3a6cbb800000002"; |
| | | |
| | | AttributeValue val = new AttributeValue(Historical.historicalAttrType, |
| | | stringVal); |
| | | LinkedHashSet<AttributeValue> valuesHist = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesHist.add(val); |
| | | ArrayList<Attribute> histList = new ArrayList<Attribute>(1); |
| | | Attribute histAttr = new Attribute(Historical.historicalAttrType, |
| | | "ds-sync-hist", valuesHist); |
| | | histList.add(0, histAttr); |
| | | |
| | | //Add the historical att in the entry |
| | | entry.putAttribute(Historical.historicalAttrType,histList) ; |
| | | |
| | | // load historical from the entry |
| | | |
| | | Historical hist = Historical.load(entry); |
| | | |
| | | /* |
| | |
| | | .getOperationalAttributes(); |
| | | |
| | | operationalAttributes.put(Historical.entryuuidAttrType, uuidList); |
| | | // Create the att values list of historicalAttr |
| | | String stringVal = |
| | | "ds-sync-hist:00000108b3a6cbb800000001:add:00000108b3a6cbb800000002"; |
| | | |
| | | AttributeValue val = new AttributeValue(Historical.historicalAttrType, |
| | | stringVal); |
| | | LinkedHashSet<AttributeValue> valuesHist = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | | valuesHist.add(val); |
| | | ArrayList<Attribute> histList = new ArrayList<Attribute>(1); |
| | | Attribute histAttr = new Attribute(Historical.historicalAttrType, |
| | | "ds-sync-hist", valuesHist); |
| | | histList.add(histAttr); |
| | | |
| | | //Add the historycal att in the entry |
| | | entry.putAttribute(Historical.historicalAttrType,histList) ; |
| | | |
| | | |
| | | // load historical from the entry |
| | | |
| | | Historical hist = Historical.load(entry); |
| | | |
| | | /* |
| | |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPException; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.synchronization.common.ServerState; |
| | | import org.opends.server.synchronization.plugin.ChangelogBroker; |
| | | import org.opends.server.synchronization.plugin.MultimasterSynchronization; |
| | | import org.opends.server.synchronization.plugin.PersistentServerState; |
| | |
| | | public class ProtocolWindowTest |
| | | { |
| | | private static final int WINDOW_SIZE = 10; |
| | | private static final int CHANGELOG_QUEUE_SIZE = 100; |
| | | |
| | | private static final String SYNCHRONIZATION_STRESS_TEST = |
| | | "Synchronization Stress Test"; |
| | |
| | | * the client receives the correct number of operations. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | | public void saturateAndRestart() throws Exception |
| | | public void saturateQueueAndRestart() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | |
| | | */ |
| | | Thread.sleep(1500); |
| | | assertTrue(checkWindows(WINDOW_SIZE)); |
| | | assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE)); |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | // send twice the window modify operations |
| | | int count = WINDOW_SIZE * 2; |
| | | // send (2 * window + changelog queue) modify operations |
| | | // so that window + changelog queue get stuck in the changelog queue |
| | | int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE; |
| | | processModify(count); |
| | | |
| | | // let some time to the message to reach the changelog client |
| | |
| | | /* |
| | | * check that we received all updates |
| | | */ |
| | | assertEquals(rcvCount, WINDOW_SIZE*2); |
| | | assertEquals(rcvCount, count); |
| | | } |
| | | finally { |
| | | broker.stop(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check that the Changelog queue size has correctly been configured |
| | | * by reading the monitoring information. |
| | | * @throws LDAPException |
| | | */ |
| | | private boolean checkChangelogQueueSize(int changelog_queue_size) |
| | | throws LDAPException |
| | | { |
| | | InternalSearchOperation op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, LDAPFilter.decode( |
| | | "(max-waiting-changes=" + changelog_queue_size + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 2); |
| | | } |
| | | |
| | | /** |
| | | * Check that the window configuration has been successfull |
| | | * by reading the monitoring information and checking |
| | | * that we do have 2 entries with the configured max-rcv-window. |
| | |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 3); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Search that the changelog has stopped sending changes after |
| | | * Search that the changelog has stopped sending changes after |
| | | * having reach the limit of the window size. |
| | | * And that the number of waiting changes is accurate. |
| | | * Do this by checking the monitoring information. |
| | | */ |
| | | private boolean searchUpdateSent() throws Exception |
| | |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | if (op.getEntriesSent() != 1) |
| | | return false; |
| | | |
| | | op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(waiting-changes=" + |
| | | (CHANGELOG_QUEUE_SIZE + WINDOW_SIZE) + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | |
| | | return (op.getEntriesSent() == 1); |
| | | } |
| | | |
| | |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n" |
| | | + "ds-cfg-changelog-server-id: 1\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" |
| | | + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; |
| | | changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); |
| | | |
| | | // suffix synchronized |