From f312ec4a15ca08a406c045748e9d627fe1e31494 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 17 Nov 2006 13:46:39 +0000
Subject: [PATCH] The synchronization changelog monitoring information has a counter named waiting-changes that publish the number of updates known by the changelog server that have not yest been sent to each ldap server because they are too slow to replay them.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                                    |   20 +++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java                                    |    7 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ServerState.java                                     |   11 +
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java                                    |   18 +--
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java |    2 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java        |   17 ++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java                           |   60 +++++----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java    |   46 ++++++-
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java                                |   65 ++++++++--
 opendj-sdk/opends/resource/config/synchronization.ldif                                                                     |    2 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java                           |   13 -
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java            |    4 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java      |   57 ---------
 opendj-sdk/opends/resource/schema/02-config.ldif                                                                           |    7 +
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java    |    2 
 15 files changed, 194 insertions(+), 137 deletions(-)

diff --git a/opendj-sdk/opends/resource/config/synchronization.ldif b/opendj-sdk/opends/resource/config/synchronization.ldif
index 6336c65..8924b3c 100644
--- a/opendj-sdk/opends/resource/config/synchronization.ldif
+++ b/opendj-sdk/opends/resource/config/synchronization.ldif
@@ -14,7 +14,7 @@
 objectClass: top
 objectClass: ds-cfg-synchronization-provider
 ds-cfg-synchronization-provider-enabled: true
-ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization
+ds-cfg-synchronization-provider-class: org.opends.server.synchronization.plugin.MultimasterSynchronization
 
 dn: cn=example, cn=Multimaster Synchronization,cn=Synchronization Providers,cn=config
 objectClass: top
diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index 70f64a4..c05900b 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -525,6 +525,10 @@
   NAME 'ds-cfg-changelog-port'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.290
+  NAME 'ds-cfg-changelog-max-queue-size'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+  SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' )
 attributeTypes: ( 1.3.6.1.4.1.26027.1.1.278
   NAME 'ds-cfg-changelog-server-id'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -1303,7 +1307,8 @@
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
   'ds-cfg-synchronization-changelog-server-config' SUP top
   STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
-  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
+  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
+  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
   SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
   X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index b90ea1d..2a74fb9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -98,11 +98,13 @@
           new ArrayList<ConfigAttribute>();
   private ChangelogDbEnv dbEnv;
   private int rcvWindow;
+  private int queueSize;
 
   static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
   static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
   static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
+  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
 
   static final IntegerConfigAttribute changelogPortStub =
     new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -122,6 +124,10 @@
     new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
                                false, false, false, true, 0, false, 0);
 
+  static final IntegerConfigAttribute queueSizeStub =
+    new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
+                               false, false, false, true, 0, false, 0);
+
   /**
    * Check if a ConfigEntry is valid.
    * @param config The config entry that needs to be checked.
@@ -247,6 +253,16 @@
       configAttributes.add(windowAttr);
     }
 
+    IntegerConfigAttribute queueSizeAttr =
+      (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub);
+    if (queueSizeAttr == null)
+      queueSize = 10000;  // Attribute is not present : use the default value
+    else
+    {
+      queueSize = queueSizeAttr.activeIntValue();
+      configAttributes.add(queueSizeAttr);
+    }
+
     initialize(changelogServerId, changelogPort);
 
     configDn = config.getDN();
@@ -325,7 +341,7 @@
         newSocket =  listenSocket.accept();
         newSocket.setReceiveBufferSize(1000000);
         ServerHandler handler = new ServerHandler(
-                                     new SocketSession(newSocket));
+                                     new SocketSession(newSocket), queueSize);
         handler.start(null, serverId, serverURL, rcvWindow, this);
       } catch (IOException e)
       {
@@ -401,7 +417,7 @@
       socket.connect(ServerAddr, 500);
 
       ServerHandler handler = new ServerHandler(
-                                      new SocketSession(socket));
+                                      new SocketSession(socket), queueSize);
       handler.start(baseDn, serverId, serverURL, rcvWindow, this);
     }
     catch (IOException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 10663dc..a8785c3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -85,6 +85,7 @@
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
   private int maxSendDelay = 0;
+  private int maxQueueSize = 10000;
   private int restartReceiveQueue;
   private int restartSendQueue;
   private int restartReceiveDelay;
@@ -109,13 +110,16 @@
   /**
    * Creates a new server handler instance with the provided socket.
    *
-   * @param  session The ProtocolSession used by the ServerHandler to
+   * @param session The ProtocolSession used by the ServerHandler to
    *                 communicate with the remote entity.
+   * @param queueSize The maximum number of update that will be kept
+   *                  in memory by this ServerHandler.
    */
-  public ServerHandler(ProtocolSession session)
+  public ServerHandler(ProtocolSession session, int queueSize)
   {
     super("Server Handler");
     this.session = session;
+    this.maxQueueSize = queueSize;
   }
 
   /**
@@ -467,19 +471,50 @@
   {
    synchronized (msgQueue)
    {
-     /*
-      * TODO : When the server  is not able to follow, the msgQueue
-      * may become too large and therefore won't contain all the
-      * changes. Some changes may only be stored in the backing DB
-      * of the servers.
-      * The calculation should be done by asking to the each dbHandler
-      * how many changes need to be replicated and making the sum
-      * For now just return maxint in this case
-      */
+    /*
+     * When the server is up to date or close to be up to date,
+     * the number of updates to be sent is the size of the receive queue.
+     */
      if (isFollowing())
        return msgQueue.size();
      else
-       return Integer.MAX_VALUE;
+     {
+       /*
+        * When the server  is not able to follow, the msgQueue
+        * may become too large and therefore won't contain all the
+        * changes. Some changes may only be stored in the backing DB
+        * of the servers.
+        * The total size of teh receieve queue is calculated by doing
+        * the sum of the number of missing changes for every dbHandler.
+        */
+       int totalCount = 0;
+       ServerState dbState = changelogCache.getDbServerState();
+       for (short id : dbState)
+       {
+         int max = dbState.getMaxChangeNumber(id).getSeqnum();
+         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
+         if (currentChange != null)
+         {
+           int current = currentChange.getSeqnum();
+           if (current == max)
+           {
+           }
+           else if (current < max)
+           {
+             totalCount += max - current;
+           }
+           else
+           {
+             totalCount += Integer.MAX_VALUE - (current - max) + 1;
+           }
+         }
+         else
+         {
+           totalCount += max;
+         }
+       }
+       return totalCount;
+     }
    }
   }
 
@@ -576,7 +611,7 @@
       /* TODO : size should be configurable
        * and larger than max-receive-queue-size
        */
-      while (msgQueue.size() > 10000)
+      while (msgQueue.size() > maxQueueSize)
       {
         following = false;
         msgQueue.removeFirst();
@@ -687,7 +722,7 @@
           {
             synchronized (msgQueue)
             {
-              if (msgQueue.size() < 10000)
+              if (msgQueue.size() < maxQueueSize)
               {
                 following = true;
               }
@@ -1026,6 +1061,8 @@
                                  baseDn.toString()));
     attributes.add(new Attribute("waiting-changes",
                                  String.valueOf(getRcvMsgQueueSize())));
+    attributes.add(new Attribute("max-waiting-changes",
+                                 String.valueOf(maxQueueSize)));
     attributes.add(new Attribute("update-waiting-acks",
                                  String.valueOf(getWaitingAckSize())));
     attributes.add(new Attribute("update-sent",
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
index ec67d61..710f6a8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumber.java
@@ -48,11 +48,10 @@
     timeStamp = Long.parseLong(temp, 16);
 
     temp = str.substring(16, 20);
-    seqnum = Integer.parseInt(temp, 16);
-
-    temp = str.substring(20, 24);
     serverId = Short.parseShort(temp, 16);
 
+    temp = str.substring(20, 28);
+    seqnum = Integer.parseInt(temp, 16);
   }
 
   /**
@@ -140,7 +139,7 @@
    */
   public String toString()
   {
-    return String.format("%016x%04x%04x", timeStamp, seqnum, serverId);
+    return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
index c865276..c3496f9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ChangeNumberGenerator.java
@@ -37,21 +37,44 @@
 public class ChangeNumberGenerator
 {
   private long lastTime;
-  private int seqnum = 0;
+  private int seqnum;
   private short serverId;
 
   /**
    * Create a new ChangeNumber Generator.
-   * @param id id to use when creating change numbers
-   * @param timestamp time to start with
+   * @param id id to use when creating change numbers.
+   * @param timestamp time to start with.
    */
   public ChangeNumberGenerator(short id, long timestamp)
   {
-    lastTime = timestamp;
-    serverId = id;
+    this.lastTime = timestamp;
+    this.serverId = id;
+    this.seqnum = 0;
   }
 
   /**
+  * Create a new ChangeNumber Generator.
+  *
+  * @param id id to use when creating change numbers.
+  * @param state This generator will be created in a way that makes sure that
+  *              all change numbers generated will be larger than all the
+  *              changenumbers currently in state.
+  */
+ public ChangeNumberGenerator(short id, ServerState state)
+ {
+   this.lastTime = TimeThread.getTime();
+   for (short stateId : state)
+   {
+     if (this.lastTime < state.getMaxChangeNumber(stateId).getTime())
+       this.lastTime = state.getMaxChangeNumber(stateId).getTime();
+     if (stateId == id)
+       this.seqnum = state.getMaxChangeNumber(id).getSeqnum();
+   }
+   this.serverId = id;
+
+ }
+
+  /**
    * Generate a new ChangeNumber.
    *
    * @return the generated ChangeNUmber
@@ -65,17 +88,12 @@
     {
       if (curTime > lastTime)
       {
-        seqnum = 0;
         lastTime = curTime;
       }
-      else
+
+      if (seqnum++ == 0)
       {
-        seqnum++;
-        if (seqnum > 0xFFFF)
-        {
-          lastTime++;
-          seqnum = 0;
-        }
+        lastTime++;
       }
     }
 
@@ -94,7 +112,6 @@
   public void adjust(ChangeNumber number)
   {
     long rcvdTime = number.getTime();
-    int rcvdSeqnum = number.getSeqnum();
 
     /* need to synchronize with NewChangeNumber method so that we
      * protect writing of seqnum and lastTime fields
@@ -103,19 +120,8 @@
     {
       if (lastTime > rcvdTime)
         return;
-      if (lastTime == rcvdTime)
-      {
-        if (seqnum < rcvdSeqnum)
-        {
-          seqnum = rcvdSeqnum;
-        }
-        return;
-      }
-      if (lastTime < rcvdTime)
-      {
-        lastTime = rcvdTime;
-        seqnum = rcvdSeqnum;
-      }
+      else
+        lastTime = lastTime++;
     }
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ServerState.java
index e5d741f..a0b8346 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ServerState.java
@@ -31,6 +31,7 @@
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.zip.DataFormatException;
@@ -44,7 +45,7 @@
  * from each server.
  * It is exchanged with the changelog servers at connection establishment time.
  */
-public class ServerState
+public class ServerState implements Iterable<Short>
 {
   private HashMap<Short, ChangeNumber> list;
 
@@ -281,4 +282,12 @@
       return result;
     }
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Iterator<Short> iterator()
+  {
+    return list.keySet().iterator();
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index a11c2bd..cb6440f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,7 +27,6 @@
 package org.opends.server.synchronization.plugin;
 
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.util.TimeThread.getTime;
 import static org.opends.server.synchronization.common.LogMessages.*;
 import static org.opends.server.synchronization.plugin.Historical.*;
 import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -308,11 +307,7 @@
     monitor = new SynchronizationMonitor(this);
     DirectoryServer.registerMonitorProvider(monitor);
 
-    // TODO : read RUV from database an make sure we don't
-    // generate changeNumber smaller than ChangeNumbers in the RUV
-    long startingChangeNumber = getTime();
-    changeNumberGenerator = new ChangeNumberGenerator(serverId,
-                                                      startingChangeNumber);
+    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
     /*
      * create the broker object used to publish and receive changes
      */
@@ -1213,7 +1208,8 @@
   {
     synchronized (pendingChanges)
     {
-      pendingChanges.remove(changeNumber);
+      PendingChange change = pendingChanges.get(changeNumber);
+      change.setCommitted(true);
       pushCommittedChanges();
     }
   }
@@ -1631,7 +1627,8 @@
 
     while ((firstChange != null) && firstChange.isCommitted())
     {
-      if (firstChange.getOp().isSynchronizationOperation() == false)
+      if ((firstChange.getOp() != null ) &&
+          (firstChange.getOp().isSynchronizationOperation() == false))
       {
         numSentUpdates++;
         broker.publish(firstChange.getMsg());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
index cb35f38..e9c5f55 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/AckMessage.java
@@ -65,10 +65,9 @@
         throw new DataFormatException("byte[] is not a valid modify msg");
       int pos = 1;
 
-      /* read the changeNumber
-       * it is always 24 characters long
-       */
-      String changenumberStr = new  String(in, pos, 24, "UTF-8");
+      /* read the changeNumber */
+      int length = getNextLength(in, pos);
+      String changenumberStr = new  String(in, pos, length, "UTF-8");
       changeNumber = new ChangeNumber(changenumberStr);
       pos +=24;
     } catch (UnsupportedEncodingException e)
@@ -95,7 +94,8 @@
   {
     try
     {
-      int length = 1 + 24;
+      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
+      int length = 1 + changeNumberByte.length + 1;
       byte[] resultByteArray = new byte[length];
       int pos = 1;
 
@@ -103,14 +103,8 @@
       resultByteArray[0] = MSG_TYPE_ACK;
 
       /* put the ChangeNumber */
-      byte[] changeNumberByte;
+      pos = addByteArray(changeNumberByte, resultByteArray, pos);
 
-      changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8");
-
-      for (int i=0; i<24; i++,pos++)
-      {
-        resultByteArray[pos] = changeNumberByte[i];
-      }
       return resultByteArray;
     } catch (UnsupportedEncodingException e)
     {
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index c19f9a2..d5aff11 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -953,13 +953,13 @@
                throws InterruptedException, DirectoryException
   {
     Entry newEntry = null ;
-    int i = timeout/50;
+    int i = timeout/200;
     if (i<1)
       i=1;
     newEntry = DirectoryServer.getEntry(dn);
     while ((i> 0) && ((newEntry == null) == exist))
     {
-      Thread.sleep(50);
+      Thread.sleep(200);
       newEntry = DirectoryServer.getEntry(dn);
       i--;
     }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
similarity index 96%
rename from opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java
rename to opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
index fc978db..186343e 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/ChangeLogTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangeLogTestCase.java
@@ -24,7 +24,7 @@
  *
  *      Portions Copyright 2006 Sun Microsystems, Inc.
  */
-package org.opends.server.changelog;
+package org.opends.server.synchronization.changelog;
 
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
similarity index 98%
rename from opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java
rename to opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
index 11ad842..e753523 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/changelog/UpdateComparatorTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/UpdateComparatorTest.java
@@ -24,7 +24,7 @@
  *
  *      Portions Copyright 2006 Sun Microsystems, Inc.
  */
-package org.opends.server.changelog;
+package org.opends.server.synchronization.changelog;
 
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
index 07f1fc5..57da6ac 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/common/ChangeNumberTest.java
@@ -53,7 +53,7 @@
        {TimeThread.getTime(), (short) 123, (short) 45}
     };
   }
-
+ 
   /**
    * Test ChangeNumber constructor
    */
@@ -71,6 +71,21 @@
   }
 
   /**
+   * Test toString and constructor from String 
+   */
+ @Test(dataProvider = "changeNumberData")
+ public void ChangeNumberEncodeDecode(long time, int seq, short id)
+        throws Exception
+ {
+   // Create 2 ChangeNumber with the same data and check equality
+   ChangeNumber cn = new ChangeNumber(time,seq,id);
+   ChangeNumber cn2 = new ChangeNumber(cn.toString());
+   
+   assertEquals(cn, cn2,
+       "The encoding/decoding of ChangeNumber is not reversible");
+ }
+  
+  /**
    * Create ChangeNumber
    */
   @DataProvider(name = "createChangeNumber")
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
index 9fc61f1..ae1c874 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/plugin/ModifyConflictTest.java
@@ -94,8 +94,8 @@
     UUID uuid = UUID.randomUUID();
 
     // Create the att values list of uuid
-    LinkedHashSet<AttributeValue> valuesUuid = new LinkedHashSet<AttributeValue>(
-        1);
+    LinkedHashSet<AttributeValue> valuesUuid =
+      new LinkedHashSet<AttributeValue>(1);
     valuesUuid.add(new AttributeValue(Historical.entryuuidAttrType,
         new ASN1OctetString(uuid.toString())));
     ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1);
@@ -110,23 +110,6 @@
         .getOperationalAttributes();
     operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
 
-    // Create the att values list of historicalAttr
-    String stringVal =
-      "ds-sync-hist:00000108b3a6cbb800000001:repl:00000108b3a6cbb800000002";
-
-  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
-      stringVal);
-    LinkedHashSet<AttributeValue> valuesHist =
-      new LinkedHashSet<AttributeValue>(1);
-    valuesHist.add(val);
-    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
-    Attribute histAttr = new Attribute(Historical.historicalAttrType,
-        "ds-sync-hist", valuesHist);
-    histList.add(histAttr);
-
-    //Add the historical att in the entry
-    operationalAttributes.put(Historical.historicalAttrType,histList) ;
-
     // load historical from the entry
     Historical hist = Historical.load(entry);
 
@@ -283,25 +266,8 @@
 
     operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
 
-    // Create the att values list of historicalAttr
-    String stringVal =
-      "ds-sync-hist:00000108b3a6cbb800000001:del:00000108b3a6cbb800000002";
-
-  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
-      stringVal);
-    LinkedHashSet<AttributeValue> valuesHist =
-      new LinkedHashSet<AttributeValue>(1);
-    valuesHist.add(val);
-    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
-    Attribute histAttr = new Attribute(Historical.historicalAttrType,
-        "ds-sync-hist", valuesHist);
-    histList.add(0, histAttr);
-
-    //Add the historical att in the entry
-    entry.putAttribute(Historical.historicalAttrType,histList) ;
 
     // load historical from the entry
-
     Historical hist = Historical.load(entry);
 
     /*
@@ -377,25 +343,8 @@
         .getOperationalAttributes();
 
     operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
-    // Create the att values list of historicalAttr
-    String stringVal =
-      "ds-sync-hist:00000108b3a6cbb800000001:add:00000108b3a6cbb800000002";
-
-  AttributeValue val = new AttributeValue(Historical.historicalAttrType,
-      stringVal);
-    LinkedHashSet<AttributeValue> valuesHist =
-      new LinkedHashSet<AttributeValue>(1);
-    valuesHist.add(val);
-    ArrayList<Attribute> histList = new ArrayList<Attribute>(1);
-    Attribute histAttr = new Attribute(Historical.historicalAttrType,
-        "ds-sync-hist", valuesHist);
-    histList.add(histAttr);
-
-    //Add the historycal att in the entry
-    entry.putAttribute(Historical.historicalAttrType,histList) ;
-
+   
     // load historical from the entry
-
     Historical hist = Historical.load(entry);
 
     /*
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
index ce766ae..ec9c1ab 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/ProtocolWindowTest.java
@@ -49,7 +49,6 @@
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPException;
 import org.opends.server.protocols.ldap.LDAPFilter;
-import org.opends.server.synchronization.common.ServerState;
 import org.opends.server.synchronization.plugin.ChangelogBroker;
 import org.opends.server.synchronization.plugin.MultimasterSynchronization;
 import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -78,6 +77,7 @@
 public class ProtocolWindowTest
 {
   private static final int WINDOW_SIZE = 10;
+  private static final int CHANGELOG_QUEUE_SIZE = 100;
 
   private static final String SYNCHRONIZATION_STRESS_TEST =
     "Synchronization Stress Test";
@@ -146,7 +146,7 @@
    *    the client receives the correct number of operations.
    */
   @Test(enabled=true, groups="slow")
-  public void saturateAndRestart() throws Exception
+  public void saturateQueueAndRestart() throws Exception
   {
     logError(ErrorLogCategory.SYNCHRONIZATION,
         ErrorLogSeverity.NOTICE,
@@ -166,6 +166,7 @@
        */
       Thread.sleep(1500);
       assertTrue(checkWindows(WINDOW_SIZE));
+      assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE));
       
       // Create an Entry (add operation) that will be later used in the test.
       Entry tmp = personEntry.duplicate();
@@ -192,8 +193,9 @@
       assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
         "The received ADD synchronization message is not for the excepted DN");
 
-      // send twice the window modify operations
-      int count = WINDOW_SIZE * 2;
+      // send (2 * window + changelog queue) modify operations
+      // so that window + changelog queue get stuck in the changelog queue 
+      int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
       processModify(count);
 
       // let some time to the message to reach the changelog client
@@ -216,7 +218,7 @@
       /*
        * check that we received all updates
        */
-      assertEquals(rcvCount, WINDOW_SIZE*2);
+      assertEquals(rcvCount, count);
     }
     finally {
       broker.stop();
@@ -225,6 +227,22 @@
   }
 
   /**
+   * Check that the Changelog queue size has correctly been configured
+   * by reading the monitoring information.
+   * @throws LDAPException 
+   */
+  private boolean checkChangelogQueueSize(int changelog_queue_size)
+          throws LDAPException
+  {
+    InternalSearchOperation op = connection.processSearch(
+        new ASN1OctetString("cn=monitor"),
+        SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(
+            "(max-waiting-changes=" +  changelog_queue_size + ")"));
+    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+    return (op.getEntriesSent() == 2);
+  }
+
+  /**
    * Check that the window configuration has been successfull
    * by reading the monitoring information and checking 
    * that we do have 2 entries with the configured max-rcv-window.
@@ -238,10 +256,11 @@
     assertEquals(op.getResultCode(), ResultCode.SUCCESS);
     return (op.getEntriesSent() == 3);
   }
-
+  
   /**
-   * Search that the changelog has stopped sending changes after 
+   * Search that the changelog has stopped sending changes after
    * having reach the limit of the window size.
+   * And that the number of waiting changes is accurate.
    * Do this by checking the monitoring information.
    */
   private boolean searchUpdateSent() throws Exception
@@ -251,6 +270,16 @@
         SearchScope.WHOLE_SUBTREE,
         LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
     assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+    if (op.getEntriesSent() != 1)
+      return false;
+    
+    op = connection.processSearch(
+        new ASN1OctetString("cn=monitor"),
+        SearchScope.WHOLE_SUBTREE,
+        LDAPFilter.decode("(waiting-changes=" +
+            (CHANGELOG_QUEUE_SIZE + WINDOW_SIZE) + ")"));
+    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+
     return (op.getEntriesSent() == 1);
   }
 
@@ -316,7 +345,8 @@
         + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
         + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
         + "ds-cfg-changelog-server-id: 1\n"
-        + "ds-cfg-window-size: " + WINDOW_SIZE;
+        + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
+        + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
     changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
 
     // suffix synchronized

--
Gitblit v1.10.0