mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
17.46.2006 46e6061d63562ce021ef8f3b5062d3eba1c2db4e
The synchronization changelog monitoring information has a counter named
waiting-changes that publish the number of updates known by the changelog
server that have not yest been sent to each ldap server because they are too slow to replay them.

This is calculated from the list of changes in memory when this list is small
enough. However when this list becomes too large it is not possible to keep
all the changes in memory and the changes therefore goes to disk.
This monitoring information therefore becomes unavailable and currently returns MAXINT.

This is unfortunate because this is the time when it is the most important.

Unfortunately the Java edition of the berkeley DB does not provide a way to
know the record number of a given db entry and therefore we can't rely on the DB
to tell us how many changes stay in the db after the current change.

This fix therefore change the format of the ChangeNumber so that the ChangeNumber
now uses consecutive numbers for its seqnum part.

The number of missing changes can therefore be calculated by a simple substraction
of the current seqnum and the largest seqnum.
the drawback of this method is that it is only accurate if the majority of update
operations done on the masters are successfull because this method also count the failed
operations.

This fix also modify the ProtocolWindowTest in order to add tests of this waiting-changes
monitoring information.

This fix also makes the size of the memory queue of messages configurable.
2 files renamed
13 files modified
331 ■■■■■ changed files
opends/resource/config/synchronization.ldif 2 ●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java 65 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java 60 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/common/ServerState.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java 18 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java 17 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java 57 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java 46 ●●●● patch | view | raw | blame | history
opends/resource/config/synchronization.ldif
@@ -14,7 +14,7 @@
objectClass: top
objectClass: ds-cfg-synchronization-provider
ds-cfg-synchronization-provider-enabled: true
ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization
ds-cfg-synchronization-provider-class: org.opends.server.synchronization.plugin.MultimasterSynchronization
dn: cn=example, cn=Multimaster Synchronization,cn=Synchronization Providers,cn=config
objectClass: top
opends/resource/schema/02-config.ldif
@@ -525,6 +525,10 @@
  NAME 'ds-cfg-changelog-port'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.290
  NAME 'ds-cfg-changelog-max-queue-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.278
  NAME 'ds-cfg-changelog-server-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -1303,7 +1307,8 @@
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
  'ds-cfg-synchronization-changelog-server-config' SUP top
  STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
  SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
  X-ORIGIN 'OpenDS Directory Server' )
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -98,11 +98,13 @@
          new ArrayList<ConfigAttribute>();
  private ChangelogDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final IntegerConfigAttribute changelogPortStub =
    new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -122,6 +124,10 @@
    new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
                               false, false, false, true, 0, false, 0);
  static final IntegerConfigAttribute queueSizeStub =
    new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
                               false, false, false, true, 0, false, 0);
  /**
   * Check if a ConfigEntry is valid.
   * @param config The config entry that needs to be checked.
@@ -247,6 +253,16 @@
      configAttributes.add(windowAttr);
    }
    IntegerConfigAttribute queueSizeAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub);
    if (queueSizeAttr == null)
      queueSize = 10000;  // Attribute is not present : use the default value
    else
    {
      queueSize = queueSizeAttr.activeIntValue();
      configAttributes.add(queueSizeAttr);
    }
    initialize(changelogServerId, changelogPort);
    configDn = config.getDN();
@@ -325,7 +341,7 @@
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket));
                                     new SocketSession(newSocket), queueSize);
        handler.start(null, serverId, serverURL, rcvWindow, this);
      } catch (IOException e)
      {
@@ -401,7 +417,7 @@
      socket.connect(ServerAddr, 500);
      ServerHandler handler = new ServerHandler(
                                      new SocketSession(socket));
                                      new SocketSession(socket), queueSize);
      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
    }
    catch (IOException e)
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -85,6 +85,7 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 10000;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
@@ -109,13 +110,16 @@
  /**
   * Creates a new server handler instance with the provided socket.
   *
   * @param  session The ProtocolSession used by the ServerHandler to
   * @param session The ProtocolSession used by the ServerHandler to
   *                 communicate with the remote entity.
   * @param queueSize The maximum number of update that will be kept
   *                  in memory by this ServerHandler.
   */
  public ServerHandler(ProtocolSession session)
  public ServerHandler(ProtocolSession session, int queueSize)
  {
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
  }
  /**
@@ -467,19 +471,50 @@
  {
   synchronized (msgQueue)
   {
     /*
      * TODO : When the server  is not able to follow, the msgQueue
      * may become too large and therefore won't contain all the
      * changes. Some changes may only be stored in the backing DB
      * of the servers.
      * The calculation should be done by asking to the each dbHandler
      * how many changes need to be replicated and making the sum
      * For now just return maxint in this case
      */
    /*
     * When the server is up to date or close to be up to date,
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.size();
     else
       return Integer.MAX_VALUE;
     {
       /*
        * When the server  is not able to follow, the msgQueue
        * may become too large and therefore won't contain all the
        * changes. Some changes may only be stored in the backing DB
        * of the servers.
        * The total size of teh receieve queue is calculated by doing
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = changelogCache.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
         if (currentChange != null)
         {
           int current = currentChange.getSeqnum();
           if (current == max)
           {
           }
           else if (current < max)
           {
             totalCount += max - current;
           }
           else
           {
             totalCount += Integer.MAX_VALUE - (current - max) + 1;
           }
         }
         else
         {
           totalCount += max;
         }
       }
       return totalCount;
     }
   }
  }
@@ -576,7 +611,7 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while (msgQueue.size() > 10000)
      while (msgQueue.size() > maxQueueSize)
      {
        following = false;
        msgQueue.removeFirst();
@@ -687,7 +722,7 @@
          {
            synchronized (msgQueue)
            {
              if (msgQueue.size() < 10000)
              if (msgQueue.size() < maxQueueSize)
              {
                following = true;
              }
@@ -1026,6 +1061,8 @@
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("max-waiting-changes",
                                 String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-waiting-acks",
                                 String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("update-sent",
opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
@@ -48,11 +48,10 @@
    timeStamp = Long.parseLong(temp, 16);
    temp = str.substring(16, 20);
    seqnum = Integer.parseInt(temp, 16);
    temp = str.substring(20, 24);
    serverId = Short.parseShort(temp, 16);
    temp = str.substring(20, 28);
    seqnum = Integer.parseInt(temp, 16);
  }
  /**
@@ -140,7 +139,7 @@
   */
  public String toString()
  {
    return String.format("%016x%04x%04x", timeStamp, seqnum, serverId);
    return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
  }
  /**
opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
@@ -37,21 +37,44 @@
public class ChangeNumberGenerator
{
  private long lastTime;
  private int seqnum = 0;
  private int seqnum;
  private short serverId;
  /**
   * Create a new ChangeNumber Generator.
   * @param id id to use when creating change numbers
   * @param timestamp time to start with
   * @param id id to use when creating change numbers.
   * @param timestamp time to start with.
   */
  public ChangeNumberGenerator(short id, long timestamp)
  {
    lastTime = timestamp;
    serverId = id;
    this.lastTime = timestamp;
    this.serverId = id;
    this.seqnum = 0;
  }
  /**
  * Create a new ChangeNumber Generator.
  *
  * @param id id to use when creating change numbers.
  * @param state This generator will be created in a way that makes sure that
  *              all change numbers generated will be larger than all the
  *              changenumbers currently in state.
  */
 public ChangeNumberGenerator(short id, ServerState state)
 {
   this.lastTime = TimeThread.getTime();
   for (short stateId : state)
   {
     if (this.lastTime < state.getMaxChangeNumber(stateId).getTime())
       this.lastTime = state.getMaxChangeNumber(stateId).getTime();
     if (stateId == id)
       this.seqnum = state.getMaxChangeNumber(id).getSeqnum();
   }
   this.serverId = id;
 }
  /**
   * Generate a new ChangeNumber.
   *
   * @return the generated ChangeNUmber
@@ -65,17 +88,12 @@
    {
      if (curTime > lastTime)
      {
        seqnum = 0;
        lastTime = curTime;
      }
      else
      if (seqnum++ == 0)
      {
        seqnum++;
        if (seqnum > 0xFFFF)
        {
          lastTime++;
          seqnum = 0;
        }
        lastTime++;
      }
    }
@@ -94,7 +112,6 @@
  public void adjust(ChangeNumber number)
  {
    long rcvdTime = number.getTime();
    int rcvdSeqnum = number.getSeqnum();
    /* need to synchronize with NewChangeNumber method so that we
     * protect writing of seqnum and lastTime fields
@@ -103,19 +120,8 @@
    {
      if (lastTime > rcvdTime)
        return;
      if (lastTime == rcvdTime)
      {
        if (seqnum < rcvdSeqnum)
        {
          seqnum = rcvdSeqnum;
        }
        return;
      }
      if (lastTime < rcvdTime)
      {
        lastTime = rcvdTime;
        seqnum = rcvdSeqnum;
      }
      else
        lastTime = lastTime++;
    }
  }
}
opends/src/server/org/opends/server/synchronization/common/ServerState.java
@@ -31,6 +31,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.zip.DataFormatException;
@@ -44,7 +45,7 @@
 * from each server.
 * It is exchanged with the changelog servers at connection establishment time.
 */
public class ServerState
public class ServerState implements Iterable<Short>
{
  private HashMap<Short, ChangeNumber> list;
@@ -281,4 +282,12 @@
      return result;
    }
  }
  /**
   * {@inheritDoc}
   */
  public Iterator<Short> iterator()
  {
    return list.keySet().iterator();
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,7 +27,6 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -308,11 +307,7 @@
    monitor = new SynchronizationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    // TODO : read RUV from database an make sure we don't
    // generate changeNumber smaller than ChangeNumbers in the RUV
    long startingChangeNumber = getTime();
    changeNumberGenerator = new ChangeNumberGenerator(serverId,
                                                      startingChangeNumber);
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
    /*
     * create the broker object used to publish and receive changes
     */
@@ -1213,7 +1208,8 @@
  {
    synchronized (pendingChanges)
    {
      pendingChanges.remove(changeNumber);
      PendingChange change = pendingChanges.get(changeNumber);
      change.setCommitted(true);
      pushCommittedChanges();
    }
  }
@@ -1631,7 +1627,8 @@
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if (firstChange.getOp().isSynchronizationOperation() == false)
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
@@ -65,10 +65,9 @@
        throw new DataFormatException("byte[] is not a valid modify msg");
      int pos = 1;
      /* read the changeNumber
       * it is always 24 characters long
       */
      String changenumberStr = new  String(in, pos, 24, "UTF-8");
      /* read the changeNumber */
      int length = getNextLength(in, pos);
      String changenumberStr = new  String(in, pos, length, "UTF-8");
      changeNumber = new ChangeNumber(changenumberStr);
      pos +=24;
    } catch (UnsupportedEncodingException e)
@@ -95,7 +94,8 @@
  {
    try
    {
      int length = 1 + 24;
      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
      int length = 1 + changeNumberByte.length + 1;
      byte[] resultByteArray = new byte[length];
      int pos = 1;
@@ -103,14 +103,8 @@
      resultByteArray[0] = MSG_TYPE_ACK;
      /* put the ChangeNumber */
      byte[] changeNumberByte;
      pos = addByteArray(changeNumberByte, resultByteArray, pos);
      changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8");
      for (int i=0; i<24; i++,pos++)
      {
        resultByteArray[pos] = changeNumberByte[i];
      }
      return resultByteArray;
    } catch (UnsupportedEncodingException e)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -953,13 +953,13 @@
               throws InterruptedException, DirectoryException
  {
    Entry newEntry = null ;
    int i = timeout/50;
    int i = timeout/200;
    if (i<1)
      i=1;
    newEntry = DirectoryServer.getEntry(dn);
    while ((i> 0) && ((newEntry == null) == exist))
    {
      Thread.sleep(50);
      Thread.sleep(200);
      newEntry = DirectoryServer.getEntry(dn);
      i--;
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java
@@ -24,7 +24,7 @@
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.changelog;
package org.opends.server.synchronization.changelog;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java
@@ -24,7 +24,7 @@
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.changelog;
package org.opends.server.synchronization.changelog;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
@@ -53,7 +53,7 @@
       {TimeThread.getTime(), (short) 123, (short) 45}
    };
  }
  /**
   * Test ChangeNumber constructor
   */
@@ -71,6 +71,21 @@
  }
  /**
   * Test toString and constructor from String
   */
 @Test(dataProvider = "changeNumberData")
 public void ChangeNumberEncodeDecode(long time, int seq, short id)
        throws Exception
 {
   // Create 2 ChangeNumber with the same data and check equality
   ChangeNumber cn = new ChangeNumber(time,seq,id);
   ChangeNumber cn2 = new ChangeNumber(cn.toString());
   assertEquals(cn, cn2,
       "The encoding/decoding of ChangeNumber is not reversible");
 }
  /**
   * Create ChangeNumber
   */
  @DataProvider(name = "createChangeNumber")
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
@@ -94,8 +94,8 @@
    UUID uuid = UUID.randomUUID();
    // Create the att values list of uuid
    LinkedHashSet<AttributeValue> valuesUuid = new LinkedHashSet<AttributeValue>(
        1);
    LinkedHashSet<AttributeValue> valuesUuid =
      new LinkedHashSet<AttributeValue>(1);
    valuesUuid.add(new AttributeValue(Historical.entryuuidAttrType,
        new ASN1OctetString(uuid.toString())));
    ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1);
@@ -110,23 +110,6 @@
        .getOperationalAttributes();
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    // Create the att values list of historicalAttr
    String stringVal =
      "ds-sync-hist:00000108b3a6cbb800000001:repl:00000108b3a6cbb800000002";
  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
      stringVal);
    LinkedHashSet<AttributeValue> valuesHist =
      new LinkedHashSet<AttributeValue>(1);
    valuesHist.add(val);
    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
    Attribute histAttr = new Attribute(Historical.historicalAttrType,
        "ds-sync-hist", valuesHist);
    histList.add(histAttr);
    //Add the historical att in the entry
    operationalAttributes.put(Historical.historicalAttrType,histList) ;
    // load historical from the entry
    Historical hist = Historical.load(entry);
@@ -283,25 +266,8 @@
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    // Create the att values list of historicalAttr
    String stringVal =
      "ds-sync-hist:00000108b3a6cbb800000001:del:00000108b3a6cbb800000002";
  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
      stringVal);
    LinkedHashSet<AttributeValue> valuesHist =
      new LinkedHashSet<AttributeValue>(1);
    valuesHist.add(val);
    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
    Attribute histAttr = new Attribute(Historical.historicalAttrType,
        "ds-sync-hist", valuesHist);
    histList.add(0, histAttr);
    //Add the historical att in the entry
    entry.putAttribute(Historical.historicalAttrType,histList) ;
    // load historical from the entry
    Historical hist = Historical.load(entry);
    /*
@@ -377,25 +343,8 @@
        .getOperationalAttributes();
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    // Create the att values list of historicalAttr
    String stringVal =
      "ds-sync-hist:00000108b3a6cbb800000001:add:00000108b3a6cbb800000002";
  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
      stringVal);
    LinkedHashSet<AttributeValue> valuesHist =
      new LinkedHashSet<AttributeValue>(1);
    valuesHist.add(val);
    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
    Attribute histAttr = new Attribute(Historical.historicalAttrType,
        "ds-sync-hist", valuesHist);
    histList.add(histAttr);
    //Add the historycal att in the entry
    entry.putAttribute(Historical.historicalAttrType,histList) ;
    // load historical from the entry
    Historical hist = Historical.load(entry);
    /*
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
@@ -49,7 +49,6 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -78,6 +77,7 @@
public class ProtocolWindowTest
{
  private static final int WINDOW_SIZE = 10;
  private static final int CHANGELOG_QUEUE_SIZE = 100;
  private static final String SYNCHRONIZATION_STRESS_TEST =
    "Synchronization Stress Test";
@@ -146,7 +146,7 @@
   *    the client receives the correct number of operations.
   */
  @Test(enabled=true, groups="slow")
  public void saturateAndRestart() throws Exception
  public void saturateQueueAndRestart() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
@@ -166,6 +166,7 @@
       */
      Thread.sleep(1500);
      assertTrue(checkWindows(WINDOW_SIZE));
      assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE));
      
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate();
@@ -192,8 +193,9 @@
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD synchronization message is not for the excepted DN");
      // send twice the window modify operations
      int count = WINDOW_SIZE * 2;
      // send (2 * window + changelog queue) modify operations
      // so that window + changelog queue get stuck in the changelog queue
      int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
      processModify(count);
      // let some time to the message to reach the changelog client
@@ -216,7 +218,7 @@
      /*
       * check that we received all updates
       */
      assertEquals(rcvCount, WINDOW_SIZE*2);
      assertEquals(rcvCount, count);
    }
    finally {
      broker.stop();
@@ -225,6 +227,22 @@
  }
  /**
   * Check that the Changelog queue size has correctly been configured
   * by reading the monitoring information.
   * @throws LDAPException
   */
  private boolean checkChangelogQueueSize(int changelog_queue_size)
          throws LDAPException
  {
    InternalSearchOperation op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(
            "(max-waiting-changes=" +  changelog_queue_size + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 2);
  }
  /**
   * Check that the window configuration has been successfull
   * by reading the monitoring information and checking 
   * that we do have 2 entries with the configured max-rcv-window.
@@ -238,10 +256,11 @@
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 3);
  }
  /**
   * Search that the changelog has stopped sending changes after
   * Search that the changelog has stopped sending changes after
   * having reach the limit of the window size.
   * And that the number of waiting changes is accurate.
   * Do this by checking the monitoring information.
   */
  private boolean searchUpdateSent() throws Exception
@@ -251,6 +270,16 @@
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    if (op.getEntriesSent() != 1)
      return false;
    op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(waiting-changes=" +
            (CHANGELOG_QUEUE_SIZE + WINDOW_SIZE) + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 1);
  }
@@ -316,7 +345,8 @@
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
        + "ds-cfg-changelog-server-id: 1\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
        + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
        + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    // suffix synchronized