mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
17.46.2006 46e6061d63562ce021ef8f3b5062d3eba1c2db4e
opends/resource/config/synchronization.ldif
@@ -14,7 +14,7 @@
objectClass: top
objectClass: ds-cfg-synchronization-provider
ds-cfg-synchronization-provider-enabled: true
ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization
ds-cfg-synchronization-provider-class: org.opends.server.synchronization.plugin.MultimasterSynchronization
dn: cn=example, cn=Multimaster Synchronization,cn=Synchronization Providers,cn=config
objectClass: top
opends/resource/schema/02-config.ldif
@@ -525,6 +525,10 @@
  NAME 'ds-cfg-changelog-port'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.290
  NAME 'ds-cfg-changelog-max-queue-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.278
  NAME 'ds-cfg-changelog-server-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -1303,7 +1307,8 @@
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
  'ds-cfg-synchronization-changelog-server-config' SUP top
  STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
  SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
  X-ORIGIN 'OpenDS Directory Server' )
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -98,11 +98,13 @@
          new ArrayList<ConfigAttribute>();
  private ChangelogDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final IntegerConfigAttribute changelogPortStub =
    new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -122,6 +124,10 @@
    new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
                               false, false, false, true, 0, false, 0);
  static final IntegerConfigAttribute queueSizeStub =
    new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
                               false, false, false, true, 0, false, 0);
  /**
   * Check if a ConfigEntry is valid.
   * @param config The config entry that needs to be checked.
@@ -247,6 +253,16 @@
      configAttributes.add(windowAttr);
    }
    IntegerConfigAttribute queueSizeAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub);
    if (queueSizeAttr == null)
      queueSize = 10000;  // Attribute is not present : use the default value
    else
    {
      queueSize = queueSizeAttr.activeIntValue();
      configAttributes.add(queueSizeAttr);
    }
    initialize(changelogServerId, changelogPort);
    configDn = config.getDN();
@@ -325,7 +341,7 @@
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket));
                                     new SocketSession(newSocket), queueSize);
        handler.start(null, serverId, serverURL, rcvWindow, this);
      } catch (IOException e)
      {
@@ -401,7 +417,7 @@
      socket.connect(ServerAddr, 500);
      ServerHandler handler = new ServerHandler(
                                      new SocketSession(socket));
                                      new SocketSession(socket), queueSize);
      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
    }
    catch (IOException e)
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -85,6 +85,7 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 10000;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
@@ -109,13 +110,16 @@
  /**
   * Creates a new server handler instance with the provided socket.
   *
   * @param  session The ProtocolSession used by the ServerHandler to
   * @param session The ProtocolSession used by the ServerHandler to
   *                 communicate with the remote entity.
   * @param queueSize The maximum number of update that will be kept
   *                  in memory by this ServerHandler.
   */
  public ServerHandler(ProtocolSession session)
  public ServerHandler(ProtocolSession session, int queueSize)
  {
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
  }
  /**
@@ -467,19 +471,50 @@
  {
   synchronized (msgQueue)
   {
     /*
      * TODO : When the server  is not able to follow, the msgQueue
      * may become too large and therefore won't contain all the
      * changes. Some changes may only be stored in the backing DB
      * of the servers.
      * The calculation should be done by asking to the each dbHandler
      * how many changes need to be replicated and making the sum
      * For now just return maxint in this case
      */
    /*
     * When the server is up to date or close to be up to date,
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.size();
     else
       return Integer.MAX_VALUE;
     {
       /*
        * When the server  is not able to follow, the msgQueue
        * may become too large and therefore won't contain all the
        * changes. Some changes may only be stored in the backing DB
        * of the servers.
        * The total size of teh receieve queue is calculated by doing
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = changelogCache.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
         if (currentChange != null)
         {
           int current = currentChange.getSeqnum();
           if (current == max)
           {
           }
           else if (current < max)
           {
             totalCount += max - current;
           }
           else
           {
             totalCount += Integer.MAX_VALUE - (current - max) + 1;
           }
         }
         else
         {
           totalCount += max;
         }
       }
       return totalCount;
     }
   }
  }
@@ -576,7 +611,7 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while (msgQueue.size() > 10000)
      while (msgQueue.size() > maxQueueSize)
      {
        following = false;
        msgQueue.removeFirst();
@@ -687,7 +722,7 @@
          {
            synchronized (msgQueue)
            {
              if (msgQueue.size() < 10000)
              if (msgQueue.size() < maxQueueSize)
              {
                following = true;
              }
@@ -1026,6 +1061,8 @@
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("max-waiting-changes",
                                 String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-waiting-acks",
                                 String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("update-sent",
opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
@@ -48,11 +48,10 @@
    timeStamp = Long.parseLong(temp, 16);
    temp = str.substring(16, 20);
    seqnum = Integer.parseInt(temp, 16);
    temp = str.substring(20, 24);
    serverId = Short.parseShort(temp, 16);
    temp = str.substring(20, 28);
    seqnum = Integer.parseInt(temp, 16);
  }
  /**
@@ -140,7 +139,7 @@
   */
  public String toString()
  {
    return String.format("%016x%04x%04x", timeStamp, seqnum, serverId);
    return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
  }
  /**
opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
@@ -37,21 +37,44 @@
public class ChangeNumberGenerator
{
  private long lastTime;
  private int seqnum = 0;
  private int seqnum;
  private short serverId;
  /**
   * Create a new ChangeNumber Generator.
   * @param id id to use when creating change numbers
   * @param timestamp time to start with
   * @param id id to use when creating change numbers.
   * @param timestamp time to start with.
   */
  public ChangeNumberGenerator(short id, long timestamp)
  {
    lastTime = timestamp;
    serverId = id;
    this.lastTime = timestamp;
    this.serverId = id;
    this.seqnum = 0;
  }
  /**
  * Create a new ChangeNumber Generator.
  *
  * @param id id to use when creating change numbers.
  * @param state This generator will be created in a way that makes sure that
  *              all change numbers generated will be larger than all the
  *              changenumbers currently in state.
  */
 public ChangeNumberGenerator(short id, ServerState state)
 {
   this.lastTime = TimeThread.getTime();
   for (short stateId : state)
   {
     if (this.lastTime < state.getMaxChangeNumber(stateId).getTime())
       this.lastTime = state.getMaxChangeNumber(stateId).getTime();
     if (stateId == id)
       this.seqnum = state.getMaxChangeNumber(id).getSeqnum();
   }
   this.serverId = id;
 }
  /**
   * Generate a new ChangeNumber.
   *
   * @return the generated ChangeNUmber
@@ -65,17 +88,12 @@
    {
      if (curTime > lastTime)
      {
        seqnum = 0;
        lastTime = curTime;
      }
      else
      if (seqnum++ == 0)
      {
        seqnum++;
        if (seqnum > 0xFFFF)
        {
          lastTime++;
          seqnum = 0;
        }
        lastTime++;
      }
    }
@@ -94,7 +112,6 @@
  public void adjust(ChangeNumber number)
  {
    long rcvdTime = number.getTime();
    int rcvdSeqnum = number.getSeqnum();
    /* need to synchronize with NewChangeNumber method so that we
     * protect writing of seqnum and lastTime fields
@@ -103,19 +120,8 @@
    {
      if (lastTime > rcvdTime)
        return;
      if (lastTime == rcvdTime)
      {
        if (seqnum < rcvdSeqnum)
        {
          seqnum = rcvdSeqnum;
        }
        return;
      }
      if (lastTime < rcvdTime)
      {
        lastTime = rcvdTime;
        seqnum = rcvdSeqnum;
      }
      else
        lastTime = lastTime++;
    }
  }
}
opends/src/server/org/opends/server/synchronization/common/ServerState.java
@@ -31,6 +31,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.zip.DataFormatException;
@@ -44,7 +45,7 @@
 * from each server.
 * It is exchanged with the changelog servers at connection establishment time.
 */
public class ServerState
public class ServerState implements Iterable<Short>
{
  private HashMap<Short, ChangeNumber> list;
@@ -281,4 +282,12 @@
      return result;
    }
  }
  /**
   * {@inheritDoc}
   */
  public Iterator<Short> iterator()
  {
    return list.keySet().iterator();
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,7 +27,6 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -308,11 +307,7 @@
    monitor = new SynchronizationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    // TODO : read RUV from database an make sure we don't
    // generate changeNumber smaller than ChangeNumbers in the RUV
    long startingChangeNumber = getTime();
    changeNumberGenerator = new ChangeNumberGenerator(serverId,
                                                      startingChangeNumber);
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
    /*
     * create the broker object used to publish and receive changes
     */
@@ -1213,7 +1208,8 @@
  {
    synchronized (pendingChanges)
    {
      pendingChanges.remove(changeNumber);
      PendingChange change = pendingChanges.get(changeNumber);
      change.setCommitted(true);
      pushCommittedChanges();
    }
  }
@@ -1631,7 +1627,8 @@
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if (firstChange.getOp().isSynchronizationOperation() == false)
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
@@ -65,10 +65,9 @@
        throw new DataFormatException("byte[] is not a valid modify msg");
      int pos = 1;
      /* read the changeNumber
       * it is always 24 characters long
       */
      String changenumberStr = new  String(in, pos, 24, "UTF-8");
      /* read the changeNumber */
      int length = getNextLength(in, pos);
      String changenumberStr = new  String(in, pos, length, "UTF-8");
      changeNumber = new ChangeNumber(changenumberStr);
      pos +=24;
    } catch (UnsupportedEncodingException e)
@@ -95,7 +94,8 @@
  {
    try
    {
      int length = 1 + 24;
      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
      int length = 1 + changeNumberByte.length + 1;
      byte[] resultByteArray = new byte[length];
      int pos = 1;
@@ -103,14 +103,8 @@
      resultByteArray[0] = MSG_TYPE_ACK;
      /* put the ChangeNumber */
      byte[] changeNumberByte;
      pos = addByteArray(changeNumberByte, resultByteArray, pos);
      changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8");
      for (int i=0; i<24; i++,pos++)
      {
        resultByteArray[pos] = changeNumberByte[i];
      }
      return resultByteArray;
    } catch (UnsupportedEncodingException e)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -953,13 +953,13 @@
               throws InterruptedException, DirectoryException
  {
    Entry newEntry = null ;
    int i = timeout/50;
    int i = timeout/200;
    if (i<1)
      i=1;
    newEntry = DirectoryServer.getEntry(dn);
    while ((i> 0) && ((newEntry == null) == exist))
    {
      Thread.sleep(50);
      Thread.sleep(200);
      newEntry = DirectoryServer.getEntry(dn);
      i--;
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java
@@ -24,7 +24,7 @@
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.changelog;
package org.opends.server.synchronization.changelog;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java
@@ -24,7 +24,7 @@
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.changelog;
package org.opends.server.synchronization.changelog;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
@@ -53,7 +53,7 @@
       {TimeThread.getTime(), (short) 123, (short) 45}
    };
  }
  /**
   * Test ChangeNumber constructor
   */
@@ -71,6 +71,21 @@
  }
  /**
   * Test toString and constructor from String
   */
 @Test(dataProvider = "changeNumberData")
 public void ChangeNumberEncodeDecode(long time, int seq, short id)
        throws Exception
 {
   // Create 2 ChangeNumber with the same data and check equality
   ChangeNumber cn = new ChangeNumber(time,seq,id);
   ChangeNumber cn2 = new ChangeNumber(cn.toString());
   assertEquals(cn, cn2,
       "The encoding/decoding of ChangeNumber is not reversible");
 }
  /**
   * Create ChangeNumber
   */
  @DataProvider(name = "createChangeNumber")
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
@@ -94,8 +94,8 @@
    UUID uuid = UUID.randomUUID();
    // Create the att values list of uuid
    LinkedHashSet<AttributeValue> valuesUuid = new LinkedHashSet<AttributeValue>(
        1);
    LinkedHashSet<AttributeValue> valuesUuid =
      new LinkedHashSet<AttributeValue>(1);
    valuesUuid.add(new AttributeValue(Historical.entryuuidAttrType,
        new ASN1OctetString(uuid.toString())));
    ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1);
@@ -110,23 +110,6 @@
        .getOperationalAttributes();
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    // Create the att values list of historicalAttr
    String stringVal =
      "ds-sync-hist:00000108b3a6cbb800000001:repl:00000108b3a6cbb800000002";
  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
      stringVal);
    LinkedHashSet<AttributeValue> valuesHist =
      new LinkedHashSet<AttributeValue>(1);
    valuesHist.add(val);
    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
    Attribute histAttr = new Attribute(Historical.historicalAttrType,
        "ds-sync-hist", valuesHist);
    histList.add(histAttr);
    //Add the historical att in the entry
    operationalAttributes.put(Historical.historicalAttrType,histList) ;
    // load historical from the entry
    Historical hist = Historical.load(entry);
@@ -283,25 +266,8 @@
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    // Create the att values list of historicalAttr
    String stringVal =
      "ds-sync-hist:00000108b3a6cbb800000001:del:00000108b3a6cbb800000002";
  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
      stringVal);
    LinkedHashSet<AttributeValue> valuesHist =
      new LinkedHashSet<AttributeValue>(1);
    valuesHist.add(val);
    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
    Attribute histAttr = new Attribute(Historical.historicalAttrType,
        "ds-sync-hist", valuesHist);
    histList.add(0, histAttr);
    //Add the historical att in the entry
    entry.putAttribute(Historical.historicalAttrType,histList) ;
    // load historical from the entry
    Historical hist = Historical.load(entry);
    /*
@@ -377,25 +343,8 @@
        .getOperationalAttributes();
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    // Create the att values list of historicalAttr
    String stringVal =
      "ds-sync-hist:00000108b3a6cbb800000001:add:00000108b3a6cbb800000002";
  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
      stringVal);
    LinkedHashSet<AttributeValue> valuesHist =
      new LinkedHashSet<AttributeValue>(1);
    valuesHist.add(val);
    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
    Attribute histAttr = new Attribute(Historical.historicalAttrType,
        "ds-sync-hist", valuesHist);
    histList.add(histAttr);
    //Add the historycal att in the entry
    entry.putAttribute(Historical.historicalAttrType,histList) ;
    // load historical from the entry
    Historical hist = Historical.load(entry);
    /*
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
@@ -49,7 +49,6 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -78,6 +77,7 @@
public class ProtocolWindowTest
{
  private static final int WINDOW_SIZE = 10;
  private static final int CHANGELOG_QUEUE_SIZE = 100;
  private static final String SYNCHRONIZATION_STRESS_TEST =
    "Synchronization Stress Test";
@@ -146,7 +146,7 @@
   *    the client receives the correct number of operations.
   */
  @Test(enabled=true, groups="slow")
  public void saturateAndRestart() throws Exception
  public void saturateQueueAndRestart() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
@@ -166,6 +166,7 @@
       */
      Thread.sleep(1500);
      assertTrue(checkWindows(WINDOW_SIZE));
      assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE));
      
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate();
@@ -192,8 +193,9 @@
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD synchronization message is not for the excepted DN");
      // send twice the window modify operations
      int count = WINDOW_SIZE * 2;
      // send (2 * window + changelog queue) modify operations
      // so that window + changelog queue get stuck in the changelog queue
      int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
      processModify(count);
      // let some time to the message to reach the changelog client
@@ -216,7 +218,7 @@
      /*
       * check that we received all updates
       */
      assertEquals(rcvCount, WINDOW_SIZE*2);
      assertEquals(rcvCount, count);
    }
    finally {
      broker.stop();
@@ -225,6 +227,22 @@
  }
  /**
   * Check that the Changelog queue size has correctly been configured
   * by reading the monitoring information.
   * @throws LDAPException
   */
  private boolean checkChangelogQueueSize(int changelog_queue_size)
          throws LDAPException
  {
    InternalSearchOperation op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(
            "(max-waiting-changes=" +  changelog_queue_size + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 2);
  }
  /**
   * Check that the window configuration has been successfull
   * by reading the monitoring information and checking 
   * that we do have 2 entries with the configured max-rcv-window.
@@ -238,10 +256,11 @@
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 3);
  }
  /**
   * Search that the changelog has stopped sending changes after
   * Search that the changelog has stopped sending changes after
   * having reach the limit of the window size.
   * And that the number of waiting changes is accurate.
   * Do this by checking the monitoring information.
   */
  private boolean searchUpdateSent() throws Exception
@@ -251,6 +270,16 @@
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    if (op.getEntriesSent() != 1)
      return false;
    op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(waiting-changes=" +
            (CHANGELOG_QUEUE_SIZE + WINDOW_SIZE) + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 1);
  }
@@ -316,7 +345,8 @@
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
        + "ds-cfg-changelog-server-id: 1\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
        + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
        + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    // suffix synchronized