From 5d3ab17c74a50bd020fc175c213f81700134c9f8 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 18 Jan 2007 09:20:48 +0000
Subject: [PATCH] put back the 980 revision that had been removed because of a regression.  - fix the regression (schema problem)  - disabled the ChangelogTest.stopChangelog() test that appear to cause    some hangs.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                             |    5 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java                           |   21 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java                             |   62 ++-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java |   73 +++-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java      |   18 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java     |   32 +-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java |  401 ++++++++++++++++++++++++++++--
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java                          |    1 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java                    |  100 ++++---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java              |   18 
 10 files changed, 563 insertions(+), 168 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 2a74fb9..f88251c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -340,6 +340,7 @@
       {
         newSocket =  listenSocket.accept();
         newSocket.setReceiveBufferSize(1000000);
+        newSocket.setTcpNoDelay(true);
         ServerHandler handler = new ServerHandler(
                                      new SocketSession(newSocket), queueSize);
         handler.start(null, serverId, serverURL, rcvWindow, this);
@@ -414,11 +415,12 @@
                      InetAddress.getByName(hostname), Integer.parseInt(port));
       Socket socket = new Socket();
       socket.setReceiveBufferSize(1000000);
+      socket.setTcpNoDelay(true);
       socket.connect(ServerAddr, 500);
 
       ServerHandler handler = new ServerHandler(
                                       new SocketSession(socket), queueSize);
-      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
+     handler.start(baseDn, serverId, this.serverURL, rcvWindow, this);
     }
     catch (IOException e)
     {
@@ -545,6 +547,7 @@
     }
 
     dbEnv.shutdown();
+    DirectoryServer.deregisterConfigurableComponent(this);
   }
 
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
index fc94e70..29e92ba 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@
   public ChangelogCursor openReadCursor(ChangeNumber changeNumber)
                 throws DatabaseException, Exception
   {
-    if (changeNumber == null)
-      changeNumber = readFirstChange();
-    if (changeNumber == null)
-      return null;
     return new ChangelogCursor(changeNumber);
   }
 
@@ -319,13 +315,16 @@
     {
       cursor = db.openCursor(txn, null);
 
-      DatabaseEntry key = new ChangelogKey(startingChangeNumber);
-      DatabaseEntry data = new DatabaseEntry();
-
-      if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
-        OperationStatus.SUCCESS)
+      if (startingChangeNumber != null)
       {
-        throw new Exception("ChangeNumber not available");
+        key = new ChangelogKey(startingChangeNumber);
+        data = new DatabaseEntry();
+
+        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
+          OperationStatus.SUCCESS)
+        {
+          throw new Exception("ChangeNumber not available");
+        }
       }
     }
 
@@ -373,7 +372,7 @@
     }
 
     /**
-     * Get the next ChangeNumber inthe database from this Cursor.
+     * Get the next ChangeNumber in the database from this Cursor.
      *
      * @return The next ChangeNumber in the database from this cursor.
      * @throws DatabaseException In case of underlying database problem.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
index ac53f2b..64bb929 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@
   private boolean shutdown = false;
   private boolean done = false;
   private DirectoryThread thread = null;
+  private Object flushLock = new Object();
 
   /**
    * Creates a New dbHandler associated to a given LDAP server.
@@ -204,6 +205,12 @@
   public ChangelogIterator generateIterator(ChangeNumber changeNumber)
                            throws DatabaseException, Exception
   {
+    /*
+     * make sure to flush some changes in the database so that
+     * we don't create the iterator on an empty database when the
+     * dbHandler has just been started.
+     */
+    flush();
     return new ChangelogIterator(serverId, db, changeNumber);
   }
 
@@ -320,17 +327,22 @@
       while ((size < 5000 ) &&  (!finished))
       {
         ChangeNumber changeNumber = cursor.nextChangeNumber();
-        if ((changeNumber != null) && (!changeNumber.equals(lastChange))
-            && (changeNumber.older(trimDate)))
+        if (changeNumber != null)
         {
-          size++;
-          cursor.delete();
+          if ((!changeNumber.equals(lastChange))
+              && (changeNumber.older(trimDate)))
+          {
+            size++;
+            cursor.delete();
+          }
+          else
+          {
+            firstChange = changeNumber;
+            finished = true;
+          }
         }
         else
-        {
-          firstChange = changeNumber;
           finished = true;
-        }
       }
 
       cursor.close();
@@ -350,19 +362,21 @@
 
     do
     {
-      // get N messages to save in the DB
-      List<UpdateMessage> changes = getChanges(500);
+      synchronized(flushLock)
+      {
+        // get N messages to save in the DB
+        List<UpdateMessage> changes = getChanges(500);
 
-      // if no more changes to save exit immediately.
-      if ((changes == null) || ((size = changes.size()) == 0))
-        return;
+        // if no more changes to save exit immediately.
+        if ((changes == null) || ((size = changes.size()) == 0))
+          return;
 
-      // save the change to the stable storage.
-      db.addEntries(changes);
+        // save the change to the stable storage.
+        db.addEntries(changes);
 
-      // remove the changes from the list of changes to be saved.
-      clear(changes.size());
-
+        // remove the changes from the list of changes to be saved.
+        clear(changes.size());
+      }
     } while (size >=500);
   }
 
@@ -387,19 +401,17 @@
       attributes.add(new Attribute("changelog-database",
                                    String.valueOf(serverId)));
       attributes.add(new Attribute("base-dn", baseDn.toString()));
-      ChangeNumber first = getFirstChange();
-      ChangeNumber last = getLastChange();
-      if (first != null)
+      if (firstChange != null)
       {
-        Date firstTime = new Date(first.getTime());
+        Date firstTime = new Date(firstChange.getTime());
         attributes.add(new Attribute("first-change",
-            first.toString() + " " + firstTime.toString()));
+            firstChange.toString() + " " + firstTime.toString()));
       }
-      if (last != null)
+      if (lastChange != null)
       {
-        Date lastTime = new Date(last.getTime());
+        Date lastTime = new Date(lastChange.getTime());
         attributes.add(new Attribute("last-change",
-            last.toString() + " " + lastTime.toString()));
+            lastChange.toString() + " " + lastTime.toString()));
       }
 
       return attributes;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index f96b10a..b42428b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -184,6 +184,7 @@
               InetAddress.getByName(hostname), Integer.parseInt(port));
           Socket socket = new Socket();
           socket.setReceiveBufferSize(1000000);
+          socket.setTcpNoDelay(true);
           socket.connect(ServerAddr, 500);
           session = new SocketSession(socket);
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
index f2c2d70..fd1f3d1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,57 +118,18 @@
       return;
 
     savedStatus = true;
-
-    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
-
-    if (values.size() == 0)
-      return;
-
-    LDAPAttribute attr =
-      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
-    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
-    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
-    mods.add(mod);
-
-    boolean done = false;
-    while (!done)
+    ResultCode resultCode = updateStateEntry();
+    if (resultCode != ResultCode.SUCCESS)
     {
-      /*
-       * Generate a modify operation on the Server State Entry :
-       * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
-       */
-      ModifyOperation op =
-        new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
-            InternalClientConnection.nextMessageID(),
-            new ArrayList<Control>(0), serverStateAsn1Dn,
-            mods);
-      op.setInternalOperation(true);
-      op.setSynchronizationOperation(true);
-
-      op.run();
-      ResultCode resultCode = op.getResultCode();
-      if (resultCode != ResultCode.SUCCESS)
+      if (resultCode == ResultCode.NO_SUCH_OBJECT)
       {
-        if (resultCode == ResultCode.NO_SUCH_OBJECT)
-        {
-          createStateEntry();
-        }
-        else
-        {
-          savedStatus = false;
-          int msgID = MSGID_ERROR_UPDATING_RUV;
-          String message = getMessage(msgID,
-              op.getResultCode().getResultCodeName(),
-              op.toString(), op.getErrorMessage(),
-              baseDn.toString());
-          logError(ErrorLogCategory.SYNCHRONIZATION,
-              ErrorLogSeverity.SEVERE_ERROR,
-              message, msgID);
-          break;
-        }
+        createStateEntry();
       }
       else
-        done = true;
+      {
+        savedStatus = false;
+
+      }
     }
   }
 
@@ -297,6 +258,51 @@
   }
 
   /**
+   * Save the current values of this PersistentState object
+   * in the appropiate entry of the database.
+   *
+   * @return a ResultCode indicating if the method was successfull.
+   */
+  private ResultCode updateStateEntry()
+  {
+    /*
+     * Generate a modify operation on the Server State Entry :
+     * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
+     */
+    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
+
+    if (values.size() == 0)
+      return ResultCode.SUCCESS;
+
+    LDAPAttribute attr =
+      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
+    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
+    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
+    mods.add(mod);
+
+    ModifyOperation op =
+      new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
+          InternalClientConnection.nextMessageID(),
+          new ArrayList<Control>(0), serverStateAsn1Dn,
+          mods);
+    op.setInternalOperation(true);
+    op.setSynchronizationOperation(true);
+
+    op.run();
+
+    ResultCode result = op.getResultCode();
+    if (result != ResultCode.SUCCESS)
+    {
+      int msgID = MSGID_ERROR_UPDATING_RUV;
+      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
+          op.toString(), op.getErrorMessage(), baseDn.toString());
+      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
+          message, msgID);
+    }
+    return result;
+  }
+
+  /**
    * Get the Dn where the ServerState is stored.
    * @return Returns the serverStateDn.
    */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
index e4922e4..5d04efe 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -95,14 +95,14 @@
     logError(ErrorLogCategory.SYNCHRONIZATION,
         ErrorLogSeverity.NOTICE,
         "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1);
-    
+
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
 
     ChangelogBroker broker = openChangelogSession(baseDn, (short) 13,
-        WINDOW_SIZE, 8989, 1000);
+        WINDOW_SIZE, 8989, 1000, true);
 
     try {
-      
+
       /* Test that changelog monitor and synchro plugin monitor informations
        * publish the correct window size.
        * This allows both the check the monitoring code and to test that
@@ -111,7 +111,7 @@
       Thread.sleep(1500);
       assertTrue(checkWindows(WINDOW_SIZE));
       assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE));
-      
+
       // Create an Entry (add operation) that will be later used in the test.
       Entry tmp = personEntry.duplicate();
       AddOperation addOp = new AddOperation(connection,
@@ -138,7 +138,7 @@
         "The received ADD synchronization message is not for the excepted DN");
 
       // send (2 * window + changelog queue) modify operations
-      // so that window + changelog queue get stuck in the changelog queue 
+      // so that window + changelog queue get stuck in the changelog queue
       int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
       processModify(count);
 
@@ -173,7 +173,7 @@
   /**
    * Check that the Changelog queue size has correctly been configured
    * by reading the monitoring information.
-   * @throws LDAPException 
+   * @throws LDAPException
    */
   private boolean checkChangelogQueueSize(int changelog_queue_size)
           throws LDAPException
@@ -188,7 +188,7 @@
 
   /**
    * Check that the window configuration has been successfull
-   * by reading the monitoring information and checking 
+   * by reading the monitoring information and checking
    * that we do have 2 entries with the configured max-rcv-window.
    */
   private boolean checkWindows(int windowSize) throws LDAPException
@@ -200,7 +200,7 @@
     assertEquals(op.getResultCode(), ResultCode.SUCCESS);
     return (op.getEntriesSent() == 3);
   }
-  
+
   /**
    * Search that the changelog has stopped sending changes after
    * having reach the limit of the window size.
@@ -216,7 +216,7 @@
     assertEquals(op.getResultCode(), ResultCode.SUCCESS);
     if (op.getEntriesSent() != 1)
       return false;
-    
+
     op = connection.processSearch(
         new ASN1OctetString("cn=monitor"),
         SearchScope.WHOLE_SUBTREE,
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
index a0d621f..9e31a81 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -89,7 +89,7 @@
   private String changeLogStringDN;
 
   private BrokerReader reader = null;
-  
+
   /**
    * A "person" entry
    */
@@ -106,13 +106,13 @@
     logError(ErrorLogCategory.SYNCHRONIZATION,
         ErrorLogSeverity.NOTICE,
         "Starting Synchronization StressTest : fromServertoBroker" , 1);
-    
+
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
     final int TOTAL_MESSAGES = 1000;
     cleanEntries();
 
     ChangelogBroker broker =
-      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000);
+      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true);
     Monitor monitor = new Monitor("stress test monitor");
     DirectoryServer.registerMonitorProvider(monitor);
 
@@ -212,7 +212,7 @@
 
     // Create an internal connection
     connection = new InternalClientConnection();
-    
+
     // Disable schema check
     schemaCheck = DirectoryServer.checkSchema();
     DirectoryServer.setCheckSchema(false);
@@ -411,7 +411,7 @@
       return count;
     }
   }
-  
+
   private class Monitor extends MonitorProvider
   {
     protected Monitor(String threadName)
@@ -445,7 +445,7 @@
     public void updateMonitorData()
     {
       // nothing to do
-    
+
     }
 
     @Override
@@ -453,7 +453,7 @@
     throws ConfigException, InitializationException
     {
       // nothing to do
-    
+
     }
 
     @Override
@@ -463,7 +463,7 @@
       return 0;
     }
 
-    
-    
+
+
   }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index 70308cb..0081bd1 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.synchronization.common.ServerState;
 import org.opends.server.synchronization.plugin.ChangelogBroker;
 import org.opends.server.synchronization.plugin.MultimasterSynchronization;
 import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -52,7 +53,7 @@
 import org.testng.annotations.BeforeClass;
 
 /**
- * An abstract class that all synchronization unit test should extend. 
+ * An abstract class that all synchronization unit test should extend.
  */
 @Test(groups = { "precommit", "synchronization" })
 public abstract class SynchronizationTestCase extends DirectoryServerTestCase
@@ -91,7 +92,7 @@
 
   /**
    * Set up the environment for performing the tests in this suite.
-   * 
+   *
    * @throws Exception
    *         If the environment could not be set up.
    */
@@ -100,21 +101,24 @@
   {
     // This test suite depends on having the schema available.
     TestCaseUtils.startServer();
-    
+    schemaCheck = DirectoryServer.checkSchema();
+
     // Create an internal connection
     connection = new InternalClientConnection();
   }
-  
+
   /**
    * Open a changelog session to the local Changelog server.
    *
    */
   protected ChangelogBroker openChangelogSession(
-      final DN baseDn, short serverId, int window_size, int port, int timeout)
+      final DN baseDn, short serverId, int window_size,
+      int port, int timeout, boolean emptyOldChanges)
           throws Exception, SocketException
   {
     PersistentServerState state = new PersistentServerState(baseDn);
-    state.loadState();
+    if (emptyOldChanges)
+      state.loadState();
     ChangelogBroker broker = new ChangelogBroker(
         state, baseDn, serverId, 0, 0, 0, 0, window_size);
     ArrayList<String> servers = new ArrayList<String>(1);
@@ -122,22 +126,45 @@
     broker.start(servers);
     if (timeout != 0)
       broker.setSoTimeout(timeout);
-    /*
-     * loop receiving update until there is nothing left
-     * to make sure that message from previous tests have been consumed.
-     */
-    try
+    if (emptyOldChanges)
     {
-      while (true)
+      /*
+       * loop receiving update until there is nothing left
+       * to make sure that message from previous tests have been consumed.
+       */
+      try
       {
-        broker.receive();
+        while (true)
+        {
+          broker.receive();
+        }
       }
+      catch (Exception e)
+      { }
     }
-    catch (Exception e)
-    { }
     return broker;
   }
-  
+
+  /**
+   * Open a new session to the Changelog Server
+   * starting with a given ServerState.
+   */
+  protected ChangelogBroker openChangelogSession(
+      final DN baseDn, short serverId, int window_size,
+      int port, int timeout, ServerState state)
+          throws Exception, SocketException
+  {
+    ChangelogBroker broker = new ChangelogBroker(
+        state, baseDn, serverId, 0, 0, 0, 0, window_size);
+    ArrayList<String> servers = new ArrayList<String>(1);
+    servers.add("localhost:" + port);
+    broker.start(servers);
+    if (timeout != 0)
+      broker.setSoTimeout(timeout);
+
+    return broker;
+  }
+
   /**
    * suppress all the entries created by the tests in this class
    */
@@ -149,11 +176,11 @@
     {
       while (true)
       {
-        DN dn = entryList.removeLast(); 
+        DN dn = entryList.removeLast();
         op = new DeleteOperation(connection, InternalClientConnection
             .nextOperationID(), InternalClientConnection.nextMessageID(), null,
             dn);
-       
+
         op.run();;
       }
     }
@@ -172,7 +199,7 @@
   public void classCleanUp() throws Exception
   {
     DirectoryServer.setCheckSchema(schemaCheck);
-  
+
     // WORKAROUND FOR BUG #639 - BEGIN -
     if (mms != null)
     {
@@ -180,7 +207,7 @@
       mms.finalizeSynchronizationProvider();
     }
     // WORKAROUND FOR BUG #639 - END -
-  
+
     cleanEntries();
   }
 
@@ -196,7 +223,7 @@
     assertNotNull(DirectoryServer.getConfigEntry(DN
         .decode(synchroPluginStringDN)),
         "Unable to add the Multimaster synchronization plugin");
-  
+
     // WORKAROUND FOR BUG #639 - BEGIN -
     DN dn = DN.decode(synchroPluginStringDN);
     ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
@@ -212,14 +239,14 @@
     }
     DirectoryServer.registerSynchronizationProvider(mms);
     // WORKAROUND FOR BUG #639 - END -
-  
+
     //
     // Add the changelog server
     DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
     assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
         "Unable to add the changeLog server");
     entryList.add(changeLogEntry.getDN());
-  
+
     //
     // We also have a replicated suffix (synchronization domain)
     DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index f76d76c..670198f 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -84,7 +84,7 @@
   private String user1entrysecondUUID;
 
   private String user1entryUUID;
-  
+
   /**
    * A "person" entry
    */
@@ -254,7 +254,7 @@
      * This must use a serverId different from the LDAP server ID
      */
     ChangelogBroker broker =
-      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000);
+      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
 
     /*
      * Create a Change number generator to generate new changenumbers
@@ -660,7 +660,7 @@
     cleanEntries();
 
     ChangelogBroker broker =
-      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000);
+      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true);
     try {
       ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
 
@@ -883,7 +883,7 @@
           break;
         }
       }
-      
+
       if (lock == null)
       {
         throw new Exception("could not lock entry " + dn);
@@ -892,8 +892,8 @@
       try
       {
         newEntry = DirectoryServer.getEntry(personWithUUIDEntry.getDN());
-      
-     
+
+
         if (newEntry == null)
           fail("The entry " + personWithUUIDEntry.getDN() +
           " has incorrectly been deleted from the database.");
@@ -903,13 +903,13 @@
         AttributeType attrType =
           DirectoryServer.getAttributeType(attrTypeStr, true);
         found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
-       
+
       }
       finally
       {
         LockManager.unlock(dn, lock);
       }
-      
+
       if (found != hasAttribute)
         Thread.sleep(100);
     } while ((--count > 0) && (found != hasAttribute));
@@ -918,7 +918,7 @@
 
   /**
    *  Get the entryUUID for a given DN.
-   *  
+   *
    * @throws Exception if the entry does not exist or does not have
    *                   an entryUUID.
    */
@@ -932,7 +932,7 @@
     while ((count> 0) && (found == null))
     {
       Thread.sleep(100);
-      
+
       Lock lock = null;
       for (int i=0; i < 3; i++)
       {
@@ -942,7 +942,7 @@
           break;
         }
       }
-      
+
       if (lock == null)
       {
         throw new Exception("could not lock entry " + dn);
@@ -951,7 +951,7 @@
       try
       {
         newEntry = DirectoryServer.getEntry(dn);
-    
+
         if (newEntry != null)
         {
           List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
@@ -991,11 +991,11 @@
     while ((count> 0) && (found != exist))
     {
       Thread.sleep(200);
-      
+
       found = DirectoryServer.entryExists(dn);
       count--;
     }
-    
+
     Lock lock = null;
     for (int i=0; i < 3; i++)
     {
@@ -1005,7 +1005,7 @@
         break;
       }
     }
-    
+
     if (lock == null)
     {
       throw new Exception("could not lock entry " + dn);
@@ -1062,7 +1062,7 @@
 
     Thread.sleep(2000);
     ChangelogBroker broker =
-      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000);
+      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true);
     try
     {
       ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 02a2521..562b2c9 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -30,33 +30,56 @@
 
 import org.opends.server.TestCaseUtils;
 import org.opends.server.config.ConfigEntry;
+import org.opends.server.core.DirectoryServer;
 import org.opends.server.synchronization.SynchronizationTestCase;
 import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ChangeNumberGenerator;
+import org.opends.server.synchronization.common.ServerState;
 import org.opends.server.synchronization.plugin.ChangelogBroker;
 import org.opends.server.synchronization.protocol.DeleteMsg;
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
+import org.opends.server.util.TimeThread;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import static org.testng.Assert.*;
 
 /**
- * Tests for the chngelog service code.
+ * Tests for the changelog service code.
  */
 
 public class ChangelogTest extends SynchronizationTestCase
 {
   /**
-   * Basic test of the changelog code.
-   * Create a changelog server, connect 2 clients and exchange
-   * messages between the clients.
+   * The changelog server that will be used in this test.
    */
-  @Test()
-  public void changelogBasic() throws Exception
+  private Changelog changelog = null;
+
+  /**
+   * The port of the changelog server.
+   */
+  private int changelogPort;
+
+  private ChangeNumber firstChangeNumberServer1 = null;
+  private ChangeNumber secondChangeNumberServer1 = null;
+  private ChangeNumber firstChangeNumberServer2 = null;
+  private ChangeNumber secondChangeNumberServer2 = null;
+
+
+  /**
+   * Before starting the tests, start the server and configure a
+   * changelog server.
+   */
+  @BeforeClass()
+  public void configure() throws Exception
   {
-    // find  a free port
+    TestCaseUtils.startServer();
+
+    //  find  a free port for the changelog server
     ServerSocket socket = TestCaseUtils.bindFreePort();
-    int changelogPort = socket.getLocalPort();
+    changelogPort = socket.getLocalPort();
     socket.close();
 
     String changelogLdif =
@@ -65,39 +88,363 @@
         + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
         + "cn: Changelog Server\n"
         + "ds-cfg-changelog-port: "+ changelogPort + "\n"
-        + "ds-cfg-changelog-server-id: 1\n";
+        + "ds-cfg-changelog-server-id: 1\n"
+        + "ds-cfg-window-size: 100";
     Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
     ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
-    Changelog changelog = new Changelog(changelogConfig);
+    changelog = new Changelog(changelogConfig);
+  }
 
-    ChangelogBroker broker1 = null;
-    ChangelogBroker broker2 = null;
-    
+  /**
+   * Basic test of the changelog code :
+   *  Connect 2 clients to the changelog server and exchange messages
+   *  between the clients.
+   *
+   * Note : Other tests in this file depends on this test and may need to
+   *        change if this test is modified.
+   */
+  @Test()
+  public void changelogBasic() throws Exception
+  {
+    ChangelogBroker server1 = null;
+    ChangelogBroker server2 = null;
+
     try {
-      broker1 = openChangelogSession(
-          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000);
-      broker2 = openChangelogSession(
-          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000);
+      /*
+       * Open a sender session and a receiver session to the changelog
+       */
+      server1 = openChangelogSession(
+          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
+          1000, true);
+      server2 = openChangelogSession(
+          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
+          1000, true);
 
-      ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1);
-      DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid");
-      broker1.publish(msg);
-      SynchronizationMessage msg2 = broker2.receive();
+      /*
+       * Create change numbers for the messages sent from server 1
+       * with current time  sequence 1 and with current time + 2 sequence 2
+       */
+      long time = TimeThread.getTime();
+      firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1);
+      secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1);
+
+      /*
+       * Create change numbers for the messages sent from server 2
+       * with current time  sequence 1 and with current time + 3 sequence 2
+       */
+      firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2);
+      secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2);
+
+      /*
+       * Send and receive a Delete Msg from server 1 to server 2
+       */
+      DeleteMsg msg =
+        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1,
+                      "uid");
+      server1.publish(msg);
+      SynchronizationMessage msg2 = server2.receive();
       if (msg2 instanceof DeleteMsg)
       {
         DeleteMsg del = (DeleteMsg) msg2;
-        assertTrue(del.toString().equals(msg2.toString()));
+        assertTrue(del.toString().equals(msg.toString()),
+            "Changelog basic : incorrect message body received.");
       }
       else
-        fail("Changelog transmission failed");
+        fail("Changelog basic : incorrect message type received.");
+
+      /*
+       * Send and receive a second Delete Msg
+       */
+      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
+      server1.publish(msg);
+      msg2 = server2.receive();
+      if (msg2 instanceof DeleteMsg)
+      {
+        DeleteMsg del = (DeleteMsg) msg2;
+        assertTrue(del.toString().equals(msg.toString()),
+            "Changelog basic : incorrect message body received.");
+      }
+      else
+        fail("Changelog basic : incorrect message type received.");
+
+      /*
+       * Send and receive a Delete Msg from server 1 to server 2
+       */
+      msg =
+        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
+                      "other-uid");
+      server2.publish(msg);
+      msg2 = server1.receive();
+      if (msg2 instanceof DeleteMsg)
+      {
+        DeleteMsg del = (DeleteMsg) msg2;
+        assertTrue(del.toString().equals(msg.toString()),
+            "Changelog basic : incorrect message body received.");
+      }
+      else
+        fail("Changelog basic : incorrect message type received.");
+
+      /*
+       * Send and receive a second Delete Msg
+       */
+      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
+      server2.publish(msg);
+      msg2 = server1.receive();
+      if (msg2 instanceof DeleteMsg)
+      {
+        DeleteMsg del = (DeleteMsg) msg2;
+        assertTrue(del.toString().equals(msg.toString()),
+            "Changelog basic : incorrect message body received.");
+      }
+      else
+        fail("Changelog basic : incorrect message type received.");
     }
     finally
     {
+      if (server1 != null)
+        server1.stop();
+      if (server2 != null)
+        server2.stop();
+    }
+  }
+
+  /**
+   * Test that a new client see the change that was sent in the
+   * previous test.
+   */
+  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
+  public void newClient() throws Exception
+  {
+    ChangelogBroker broker = null;
+
+    try {
+      broker =
+        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
+                             100, changelogPort, 1000, false);
+
+      SynchronizationMessage msg2 = broker.receive();
+      if (!(msg2 instanceof DeleteMsg))
+        fail("Changelog basic transmission failed");
+      else
+      {
+        DeleteMsg del = (DeleteMsg) msg2;
+        assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
+            "The first message received by a new client was the wrong one."
+            + del.getChangeNumber() + " " + firstChangeNumberServer1);
+      }
+    }
+    finally
+    {
+      if (broker != null)
+        broker.stop();
+    }
+  }
+
+
+
+  /**
+   * Test that a client that has already seen some changes now receive
+   * the correct next change.
+   */
+  private void newClientWithChanges(
+      ServerState state, ChangeNumber nextChangeNumber) throws Exception
+  {
+    ChangelogBroker broker = null;
+
+    /*
+     * Connect to the changelog server using the state created above.
+     */
+    try {
+      broker =
+        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
+                             100, changelogPort, 1000, state);
+
+      SynchronizationMessage msg2 = broker.receive();
+      if (!(msg2 instanceof DeleteMsg))
+        fail("Changelog basic transmission failed");
+      else
+      {
+        DeleteMsg del = (DeleteMsg) msg2;
+        assertTrue(del.getChangeNumber().equals(nextChangeNumber),
+            "The second message received by a new client was the wrong one."
+            + del.getChangeNumber() + " " + nextChangeNumber);
+      }
+    }
+    finally
+    {
+      if (broker != null)
+        broker.stop();
+    }
+  }
+
+  /**
+   * Test that a client that has already seen the first change now see the
+   * second change
+   */
+  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
+  public void newClientWithFirstChanges() throws Exception
+  {
+    /*
+     * Create a ServerState updated with the first changes from both servers
+     * done in test changelogBasic.
+     */
+    ServerState state = new ServerState();
+    state.update(firstChangeNumberServer1);
+    state.update(firstChangeNumberServer2);
+
+    newClientWithChanges(state, secondChangeNumberServer1);
+  }
+
+  /**
+   * Test that a client that has already seen the first change from server 1
+   * now see the first change from server 2
+   */
+  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
+  public void newClientWithChangefromServer1() throws Exception
+  {
+    /*
+     * Create a ServerState updated with the first change from server 1
+     */
+    ServerState state = new ServerState();
+    state.update(firstChangeNumberServer1);
+
+    newClientWithChanges(state, firstChangeNumberServer2);
+  }
+
+  /**
+   * Test that a client that has already seen the first chaneg from server 2
+   * now see the first change from server 1
+   */
+  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
+  public void newClientWithChangefromServer2() throws Exception
+  {
+    /*
+     * Create a ServerState updated with the first change from server 1
+     */
+    ServerState state = new ServerState();
+    state.update(firstChangeNumberServer2);
+
+    newClientWithChanges(state, firstChangeNumberServer1);
+  }
+
+  /**
+   * Test that a client that has not seen the second change from server 1
+   * now receive it.
+   */
+  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
+  public void newClientLateServer1() throws Exception
+  {
+    /*
+     * Create a ServerState updated with the first change from server 1
+     */
+    ServerState state = new ServerState();
+    state.update(secondChangeNumberServer2);
+    state.update(firstChangeNumberServer1);
+
+    newClientWithChanges(state, secondChangeNumberServer1);
+  }
+
+  /**
+   * Test that newClient() and newClientWithFirstChange() still works
+   * after stopping and restarting the changelog server.
+   */
+  @Test(enabled=false, dependsOnMethods = { "changelogBasic" })
+  public void stopChangelog() throws Exception
+  {
+    changelog.shutdown();
+    configure();
+    newClient();
+    newClientWithFirstChanges();
+    newClientWithChangefromServer1();
+    newClientWithChangefromServer2();
+  }
+
+  /**
+   * Stress test from client using the ChangelogBroker API
+   * to the changelog server.
+   */
+  @Test(enabled=false, groups="slow")
+  public void stressFromBrokertoChangelog() throws Exception
+  {
+    ChangelogBroker server = null;
+
+    try
+    {
+      /*
+       * Open a sender session
+       */
+      server = openChangelogSession(
+          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
+          1000, true);
+
+      BrokerReader reader = new BrokerReader(server);
+      reader.start();
+
+      ChangeNumberGenerator gen =
+        new ChangeNumberGenerator((short)5 , (long) 0);
+      /*
+       * Simple loop creating changes and sending them
+       * to the changelog server.
+       */
+      for (int i = 0; i< 100000; i++)
+      {
+        DeleteMsg msg =
+          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
+          "uid");
+        server.publish(msg);
+      }
+    }
+    finally
+    {
+      if (server != null)
+        server.stop();
+    }
+  }
+
+  /**
+   * After the tests stop the changelog server.
+   */
+  @AfterClass()
+  public void shutdown() throws Exception
+  {
+    if (changelog != null)
       changelog.shutdown();
-      if (broker1 != null)
-        broker1.stop();
-      if (broker2 != null)
-        broker2.stop();
+  }
+  /**
+   * Continuously reads messages from a changelog broker until there is nothing
+   * left. Count the number of received messages.
+   */
+  private class BrokerReader extends Thread
+  {
+    private ChangelogBroker broker;
+
+    /**
+     * Creates a new Stress Test Reader
+     * @param broker
+     */
+    public BrokerReader(ChangelogBroker broker)
+    {
+      this.broker = broker;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run()
+    {
+      // loop receiving messages until either we get a timeout
+      // because there is nothing left or an error condition happens.
+      try
+      {
+        while (true)
+        {
+          SynchronizationMessage msg = broker.receive();
+          if (msg == null)
+            break;
+        }
+      } catch (Exception e) {
+      }
     }
   }
 }

--
Gitblit v1.10.0