From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508  These changes implement a window mechanism in the sycnhronization protocol.

---
 opends/resource/schema/02-config.ldif                                                                   |    9 
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java     |  494 +++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java |   67 +
 opends/src/server/org/opends/server/changelog/Changelog.java                                            |   92 +
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java             |  153 ++-
 opends/src/server/org/opends/server/synchronization/ServerStartMessage.java                             |   32 
 opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java                         |    4 
 opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java                          |   38 
 opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java                          |   33 
 opends/src/server/org/opends/server/synchronization/WindowMessage.java                                  |  141 +++
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java    |  331 ++++----
 opends/src/server/org/opends/server/changelog/DbHandler.java                                            |   12 
 opends/src/server/org/opends/server/changelog/ServerReader.java                                         |    7 
 opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java                     |    2 
 opends/src/server/org/opends/server/changelog/ChangelogDB.java                                          |  176 ---
 opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java                         |   86 +-
 opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java                          |   62 +
 opends/src/server/org/opends/server/changelog/ChangelogCache.java                                       |   29 
 opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java                                       |  245 ++++++
 opends/src/server/org/opends/server/changelog/ServerHandler.java                                        |  122 ++
 opends/src/server/org/opends/server/changelog/SocketSession.java                                        |    4 
 opends/src/server/org/opends/server/synchronization/ChangelogBroker.java                                |  101 ++
 22 files changed, 1,723 insertions(+), 517 deletions(-)

diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 03a17ef..8c8f7ce 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -547,6 +547,10 @@
   NAME 'ds-cfg-bind-with-dn-requires-password'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.288
+  NAME 'ds-cfg-window-size'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 attributeTypes: ( 1.3.6.1.4.1.26027.1.1.164
   NAME 'ds-cfg-max-receive-queue'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
@@ -1239,7 +1243,8 @@
   STRUCTURAL MUST ( ds-cfg-changelog-server $ ds-cfg-directory-server-id
   $ ds-cfg-synchronization-dn )
   MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $
-  ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay )
+  ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $
+  ds-cfg-window-size )
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.59
   NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator
@@ -1295,7 +1300,7 @@
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
   'ds-cfg-synchronization-changelog-server-config' SUP top
   STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
-  MAY ( ds-cfg-changelog-server $ cn ) X-ORIGIN 'OpenDS Directory Server' )
+  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
   SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
   X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opends/src/server/org/opends/server/changelog/Changelog.java b/opends/src/server/org/opends/server/changelog/Changelog.java
index 6660b1b..497e2f4 100644
--- a/opends/src/server/org/opends/server/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/changelog/Changelog.java
@@ -71,14 +71,14 @@
  */
 public class Changelog implements Runnable, ConfigurableComponent
 {
-  static private short serverId;
-  static private String serverURL;
+  private short serverId;
+  private String serverURL;
 
-  private static ServerSocket listenSocket;
-  private static Thread myListenThread;
-  private static Thread myConnectThread;
+  private ServerSocket listenSocket;
+  private Thread myListenThread;
+  private Thread myConnectThread;
 
-  private static boolean runListen = true;
+  private boolean runListen = true;
 
   /* The list of changelog servers configured by the administrator */
   private List<String> changelogServers;
@@ -86,19 +86,22 @@
   /* This table is used to store the list of dn for which we are currently
    * handling servers.
    */
-  private static HashMap<DN, ChangelogCache> baseDNs =
+  private HashMap<DN, ChangelogCache> baseDNs =
           new HashMap<DN, ChangelogCache>();
 
   private String localURL = "null";
-  private static boolean shutdown = false;
+  private boolean shutdown = false;
   private short changelogServerId;
   private DN configDn;
   private List<ConfigAttribute> configAttributes =
           new ArrayList<ConfigAttribute>();
+  private ChangelogDbEnv dbEnv;
+  private int rcvWindow;
 
   static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
   static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
+  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
 
   static final IntegerConfigAttribute changelogPortStub =
     new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -114,6 +117,10 @@
         "changelog server information", true,
         true, false);
 
+  static final IntegerConfigAttribute windowStub =
+    new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
+                               false, false, false, true, 0, false, 0);
+
   /**
    * Check if a ConfigEntry is valid.
    * @param config The config entry that needs to be checked.
@@ -229,6 +236,16 @@
     }
     configAttributes.add(changelogServer);
 
+    IntegerConfigAttribute windowAttr =
+      (IntegerConfigAttribute) config.getConfigAttribute(windowStub);
+    if (windowAttr == null)
+      rcvWindow = 100;  // Attribute is not present : use the default value
+    else
+    {
+      rcvWindow = windowAttr.activeIntValue();
+      configAttributes.add(windowAttr);
+    }
+
     initialize(changelogServerId, changelogPort);
 
     configDn = config.getDN();
@@ -305,9 +322,10 @@
       try
       {
         newSocket =  listenSocket.accept();
+        newSocket.setReceiveBufferSize(1000000);
         ServerHandler handler = new ServerHandler(
                                      new SocketSession(newSocket));
-        handler.start(null);
+        handler.start(null, serverId, serverURL, rcvWindow, this);
       } catch (IOException e)
       {
         // ignore
@@ -378,11 +396,12 @@
       InetSocketAddress ServerAddr = new InetSocketAddress(
                      InetAddress.getByName(hostname), Integer.parseInt(port));
       Socket socket = new Socket();
+      socket.setReceiveBufferSize(1000000);
       socket.connect(ServerAddr, 500);
 
       ServerHandler handler = new ServerHandler(
                                       new SocketSession(socket));
-      handler.start(baseDn);
+      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
     }
     catch (IOException e)
     {
@@ -406,8 +425,9 @@
        * Initialize the changelog database.
        * TODO : the changelog db path should be configurable
        */
-      ChangelogDB.initialize(DirectoryServer.getServerRoot() + File.separator
-          + "changelogDb");
+      dbEnv = new ChangelogDbEnv(
+          DirectoryServer.getServerRoot() + File.separator + "changelogDb",
+          this);
 
       /*
        * create changelog cache
@@ -421,7 +441,9 @@
       String localAdddress = InetAddress.getLocalHost().getHostAddress();
       serverURL = localhostname + ":" + String.valueOf(changelogPort);
       localURL = localAdddress + ":" + String.valueOf(changelogPort);
-      listenSocket = new ServerSocket(changelogPort);
+      listenSocket = new ServerSocket();
+      listenSocket.setReceiveBufferSize(1000000);
+      listenSocket.bind(new InetSocketAddress(changelogPort));
 
       /*
        * create working threads
@@ -460,32 +482,12 @@
   }
 
   /**
-   * Retrieves the unique identifier for this changelog.
-   *
-   * @return  The unique identifier for this changelog.
-   */
-  public static short getServerId()
-  {
-    return serverId;
-  }
-
-  /**
-   * Retrieves the host and port for this changelog, separated by a colon.
-   *
-   * @return  The host and port for this changelog, separated by a colon.
-   */
-  public static String getServerURL()
-  {
-    return serverURL;
-  }
-
-  /**
    * Get the ChangelogCache associated to the base DN given in parameter.
    *
    * @param baseDn The base Dn for which the ChangelogCache must be returned.
    * @return The ChangelogCache associated to the base DN given in parameter.
    */
-  public static ChangelogCache getChangelogCache(DN baseDn)
+  public ChangelogCache getChangelogCache(DN baseDn)
   {
     ChangelogCache changelogCache;
 
@@ -493,7 +495,7 @@
     {
       changelogCache = baseDNs.get(baseDn);
       if (changelogCache == null)
-        changelogCache = new ChangelogCache(baseDn);
+        changelogCache = new ChangelogCache(baseDn, this);
       baseDNs.put(baseDn, changelogCache);
     }
 
@@ -503,7 +505,7 @@
   /**
    * Shutdown the Changelog service and all its connections.
    */
-  public static void shutdown()
+  public void shutdown()
   {
     shutdown = true;
 
@@ -525,6 +527,22 @@
       changelogCache.shutdown();
     }
 
-    ChangelogDB.shutdownDbEnvironment();
+    dbEnv.shutdown();
+  }
+
+
+  /**
+   * Creates a new DB handler for this Changelog and the serverId and
+   * DN given in parameter.
+   *
+   * @param id The serverId for which the dbHandler must be created.
+   * @param baseDn The DN for which the dbHandler muste be created.
+   * @return The new DB handler for this Changelog and the serverId and
+   *         DN given in parameter.
+   * @throws DatabaseException in case of underlying database problem.
+   */
+  public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
+  {
+    return new DbHandler(id, baseDn, this, dbEnv);
   }
 }
diff --git a/opends/src/server/org/opends/server/changelog/ChangelogCache.java b/opends/src/server/org/opends/server/changelog/ChangelogCache.java
index c92a90b..398bcf3 100644
--- a/opends/src/server/org/opends/server/changelog/ChangelogCache.java
+++ b/opends/src/server/org/opends/server/changelog/ChangelogCache.java
@@ -100,15 +100,18 @@
    */
   private Map<Short, DbHandler> sourceDbHandlers =
     new ConcurrentHashMap<Short, DbHandler>();
+  private Changelog changelog;
 
   /**
    * Creates a new ChangelogCache associated to the DN baseDn.
    *
    * @param baseDn The baseDn associated to the ChangelogCache.
+   * @param changelog the Changelog that created this changelog cache.
    */
-  public ChangelogCache(DN baseDn)
+  public ChangelogCache(DN baseDn, Changelog changelog)
   {
     this.baseDn = baseDn;
+    this.changelog = changelog;
   }
 
   /**
@@ -161,7 +164,7 @@
       {
         try
         {
-          dbHandler = new DbHandler(id, baseDn);
+          dbHandler = changelog.newDbHandler(id, baseDn);
         } catch (DatabaseException e)
         {
           /*
@@ -169,14 +172,13 @@
            * from at least one LDAP server.
            * This changelog therefore can't do it's job properly anymore
            * and needs to close all its connections and shutdown itself.
-           * TODO : log error
            */
           int    msgID   = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
           String message = getMessage(msgID) + stackTraceToSingleLineString(e);
           logError(ErrorLogCategory.SYNCHRONIZATION,
                    ErrorLogSeverity.SEVERE_ERROR,
                    message, msgID);
-          Changelog.shutdown();
+          changelog.shutdown();
           return;
         }
         sourceDbHandlers.put(id, dbHandler);
@@ -411,14 +413,15 @@
   /**
    * creates a new ChangelogDB with specified identifier.
    * @param id the identifier of the new ChangelogDB.
-   * @param baseDn the baseDn of the new ChangelogDB.
+   * @param db the new db.
+   *
    * @throws DatabaseException If a database error happened.
    */
-  public void newDb(short id, DN baseDn) throws DatabaseException
+  public void newDb(short id, DbHandler db) throws DatabaseException
   {
     synchronized (sourceDbHandlers)
     {
-      sourceDbHandlers.put(id , new DbHandler(id, baseDn));
+      sourceDbHandlers.put(id , db);
     }
   }
 
@@ -441,6 +444,16 @@
    */
   public void ack(AckMessage message, short fromServerId)
   {
+    /*
+     * there are 2 possible cases here :
+     *  - the message that was acked comes from a server to which
+     *    we are directly connected.
+     *    In this case, we can find the handler from the connectedServers map
+     *  - the message that was acked comes from a server to which we are not
+     *    connected.
+     *    In this case we need to find the changelog server that forwarded
+     *    the change and send back the ack to this server.
+     */
     ServerHandler handler = connectedServers.get(
                                        message.getChangeNumber().getServerId());
     if (handler != null)
@@ -551,6 +564,4 @@
   {
     return "ChangelogCache " + baseDn;
   }
-
-
 }
diff --git a/opends/src/server/org/opends/server/changelog/ChangelogDB.java b/opends/src/server/org/opends/server/changelog/ChangelogDB.java
index 45c6544..c476e40 100644
--- a/opends/src/server/org/opends/server/changelog/ChangelogDB.java
+++ b/opends/src/server/org/opends/server/changelog/ChangelogDB.java
@@ -32,10 +32,8 @@
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.util.List;
-import java.io.File;
 import java.io.UnsupportedEncodingException;
 
-import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DN;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
@@ -45,10 +43,7 @@
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.Transaction;
@@ -60,143 +55,34 @@
  */
 public class ChangelogDB
 {
-  private static Environment dbEnvironment = null;
   private Database db = null;
-  private static Database stateDb = null;
-  private String stringId = null;
+  private ChangelogDbEnv dbenv = null;
+  private Changelog changelog;
+  private Short serverId;
+  private DN baseDn;
 
   /**
    * Creates a new database or open existing database that will be used
    * to store and retrieve changes from an LDAP server.
    * @param serverId Identifier of the LDAP server.
    * @param baseDn baseDn of the LDAP server.
+   * @param changelog the Changelog that needs to be shutdown
+   * @param dbenv the Db encironemnet to use to create the db
    * @throws DatabaseException if a database problem happened
    */
-  public ChangelogDB(Short serverId, DN baseDn)
+  public ChangelogDB(Short serverId, DN baseDn, Changelog changelog,
+                     ChangelogDbEnv dbenv)
                      throws DatabaseException
   {
-    try {
-      stringId = serverId.toString() + " " + baseDn.toNormalizedString();
-      byte[] byteId = stringId.getBytes("UTF-8");
+    this.serverId = serverId;
+    this.baseDn = baseDn;
+    this.dbenv = dbenv;
+    this.changelog = changelog;
+    db = dbenv.getOrAddDb(serverId, baseDn);
 
-      // Open the database. Create it if it does not already exist.
-      DatabaseConfig dbConfig = new DatabaseConfig();
-      dbConfig.setAllowCreate(true);
-      dbConfig.setTransactional(true);
-
-      db = dbEnvironment.openDatabase(null, stringId, dbConfig);
-
-      DatabaseEntry key = new DatabaseEntry();
-      key.setData(byteId);
-      DatabaseEntry data = new DatabaseEntry();
-      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
-      if (status == OperationStatus.NOTFOUND)
-      {
-        Transaction txn = dbEnvironment.beginTransaction(null, null);
-        try {
-          data.setData(byteId);
-          stateDb.put(txn, key, data);
-          txn.commitWriteNoSync();
-        } catch (DatabaseException dbe)
-        {
-          // Abort the txn and propagate the Exception to the caller
-          txn.abort();
-          throw dbe;
-        }
-      }
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      // never happens
-    }
   }
 
   /**
-   * Initialize this class.
-   * Creates Db environment that will be used to create databases.
-   * It also reads the currently known databases from the "changelogstate"
-   * database.
-   * @param path Path where the backing files must be created.
-   * @throws DatabaseException If a DatabaseException occured that prevented
-   *                           the initialization to happen.
-   * @throws ChangelogDBException If a changelog internal error caused
-   *                              a failure of the changelog processing.
-   */
-  public static void initialize(String path) throws DatabaseException,
-                                                    ChangelogDBException
-  {
-    EnvironmentConfig envConfig = new EnvironmentConfig();
-
-    /* Create the DB Environment that will be used for all
-     * the Changelog activities related to the db
-     */
-    envConfig.setAllowCreate(true);
-    envConfig.setTransactional(true);
-    envConfig.setConfigParam("je.cleaner.expunge", "true");
-    // TODO : the DB cache size should be configurable
-    // For now set 5M is OK for being efficient in 64M total for the JVM
-    envConfig.setConfigParam("je.maxMemory", "5000000");
-    dbEnvironment = new Environment(new File(path), envConfig);
-
-    /*
-     * One database is created to store the update from each LDAP
-     * server in the topology.
-     * The database "changelogstate" is used to store the list of all
-     * the servers that have been seen in the past.
-     */
-    DatabaseConfig dbConfig = new DatabaseConfig();
-    dbConfig.setAllowCreate(true);
-    dbConfig.setTransactional(true);
-
-    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
-    Cursor cursor = stateDb.openCursor(null, null);
-    DatabaseEntry key = new DatabaseEntry();
-    DatabaseEntry data = new DatabaseEntry();
-    try
-    {
-      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
-      while (status == OperationStatus.SUCCESS)
-      {
-        try
-        {
-          String stringData = new String(data.getData(), "UTF-8");
-          String[] str = stringData.split(" ", 2);
-          short serverId = new Short(str[0]);
-          DN baseDn = null;
-          try
-          {
-            baseDn = DN.decode(str[1]);
-          } catch (DirectoryException e)
-          {
-            int    msgID   = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER;
-            String message = getMessage(msgID, str[1]);
-            logError(ErrorLogCategory.SYNCHRONIZATION,
-                     ErrorLogSeverity.SEVERE_ERROR,
-                     message, msgID);
-          }
-          Changelog.getChangelogCache(baseDn).newDb(serverId, baseDn);
-        } catch (NumberFormatException e)
-        {
-          // should never happen
-          throw new ChangelogDBException(0,
-              "changelog state database has a wrong format");
-        } catch (UnsupportedEncodingException e)
-        {
-          // should never happens
-          throw new ChangelogDBException(0, "need UTF-8 support");
-        }
-        status = cursor.getNext(key, data, LockMode.DEFAULT);
-      }
-      cursor.close();
-
-    } catch (DatabaseException dbe) {
-      cursor.close();
-      throw dbe;
-    }
-  }
-
-
-  /**
    * add a list of changes to the underlying db.
    *
    * @param changes The list of changes to add to the underlying db.
@@ -207,7 +93,7 @@
 
     try
     {
-      txn = dbEnvironment.beginTransaction(null, null);
+      txn = dbenv.beginTransaction();
 
       for (UpdateMessage change : changes)
       {
@@ -224,7 +110,7 @@
           logError(ErrorLogCategory.SYNCHRONIZATION,
                    ErrorLogSeverity.SEVERE_ERROR,
                    message, msgID);
-          Changelog.shutdown();
+          changelog.shutdown();
         }
       }
 
@@ -238,7 +124,7 @@
       logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message, msgID);
-      Changelog.shutdown();
+      changelog.shutdown();
       if (txn != null)
       {
         try
@@ -264,7 +150,7 @@
     } catch (DatabaseException e)
     {
       int    msgID   = MSGID_EXCEPTION_CLOSING_DATABASE;
-      String message = getMessage(msgID, stringId)  +
+      String message = getMessage(msgID, this.toString())  +
                                  stackTraceToSingleLineString(e);
       logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
@@ -355,7 +241,7 @@
       logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message, msgID);
-      Changelog.shutdown();
+      changelog.shutdown();
       return null;
     }
   }
@@ -396,28 +282,18 @@
       logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message, msgID);
-      Changelog.shutdown();
+      changelog.shutdown();
       return null;
     }
   }
 
   /**
-   * Shutdown the Db environment.
+   * {@inheritDoc}
    */
-  public static void shutdownDbEnvironment()
+  @Override
+  public String toString()
   {
-    try
-    {
-      stateDb.close();
-      dbEnvironment.close();
-    } catch (DatabaseException e)
-    {
-      int    msgID   = MSGID_ERROR_CLOSING_CHANGELOG_ENV;
-      String message = getMessage(msgID) + stackTraceToSingleLineString(e);
-      logError(ErrorLogCategory.SYNCHRONIZATION,
-               ErrorLogSeverity.SEVERE_ERROR,
-               message, msgID);
-    }
+    return serverId.toString() + baseDn.toString();
   }
 
   /**
@@ -455,7 +331,7 @@
 
     private ChangelogCursor() throws DatabaseException
     {
-      txn = dbEnvironment.beginTransaction(null, null);
+      txn = dbenv.beginTransaction();
       cursor = db.openCursor(txn, null);
     }
 
@@ -477,7 +353,7 @@
         logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.SEVERE_ERROR,
                  message, msgID);
-        Changelog.shutdown();
+        changelog.shutdown();
       }
       if (txn != null)
       {
@@ -491,7 +367,7 @@
           logError(ErrorLogCategory.SYNCHRONIZATION,
                    ErrorLogSeverity.SEVERE_ERROR,
                    message, msgID);
-          Changelog.shutdown();
+          changelog.shutdown();
         }
       }
     }
diff --git a/opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java b/opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java
new file mode 100644
index 0000000..f9fdfa4
--- /dev/null
+++ b/opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java
@@ -0,0 +1,245 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.changelog;
+
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.synchronization.SynchMessages.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+/**
+ * This class is used to represent a Db environement that can be used
+ * to create ChangelogDB.
+ */
+public class ChangelogDbEnv
+{
+  private Environment dbEnvironment = null;
+  private Database stateDb = null;
+  private Changelog changelog = null;
+
+  /**
+   * Initialize this class.
+   * Creates Db environment that will be used to create databases.
+   * It also reads the currently known databases from the "changelogstate"
+   * database.
+   * @param path Path where the backing files must be created.
+   * @param changelog the Changelog that creates this ChangelogDbEnv.
+   * @throws DatabaseException If a DatabaseException occured that prevented
+   *                           the initialization to happen.
+   * @throws ChangelogDBException If a changelog internal error caused
+   *                              a failure of the changelog processing.
+   */
+  public ChangelogDbEnv(String path, Changelog changelog)
+         throws DatabaseException, ChangelogDBException
+  {
+    this.changelog = changelog;
+    EnvironmentConfig envConfig = new EnvironmentConfig();
+
+    /* Create the DB Environment that will be used for all
+     * the Changelog activities related to the db
+     */
+    envConfig.setAllowCreate(true);
+    envConfig.setTransactional(true);
+    envConfig.setConfigParam("je.cleaner.expunge", "true");
+    // TODO : the DB cache size should be configurable
+    // For now set 5M is OK for being efficient in 64M total for the JVM
+    envConfig.setConfigParam("je.maxMemory", "5000000");
+    dbEnvironment = new Environment(new File(path), envConfig);
+
+    /*
+     * One database is created to store the update from each LDAP
+     * server in the topology.
+     * The database "changelogstate" is used to store the list of all
+     * the servers that have been seen in the past.
+     */
+    DatabaseConfig dbConfig = new DatabaseConfig();
+    dbConfig.setAllowCreate(true);
+    dbConfig.setTransactional(true);
+
+    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
+    start();
+
+  }
+
+  /**
+   * Read the list of known servers from the database and start dbHandler
+   * for each of them.
+   *
+   * @throws DatabaseException in case of underlying DatabaseException
+   * @throws ChangelogDBException when the information from the database
+   *                              cannot be decoded correctly.
+   */
+  private void start() throws DatabaseException, ChangelogDBException
+  {
+    Cursor cursor = stateDb.openCursor(null, null);
+    DatabaseEntry key = new DatabaseEntry();
+    DatabaseEntry data = new DatabaseEntry();
+    try
+    {
+      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+      while (status == OperationStatus.SUCCESS)
+      {
+        try
+        {
+          String stringData = new String(data.getData(), "UTF-8");
+          String[] str = stringData.split(" ", 2);
+          short serverId = new Short(str[0]);
+          DN baseDn = null;
+          try
+          {
+            baseDn = DN.decode(str[1]);
+          } catch (DirectoryException e)
+          {
+            int    msgID   = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER;
+            String message = getMessage(msgID, str[1]);
+            logError(ErrorLogCategory.SYNCHRONIZATION,
+                     ErrorLogSeverity.SEVERE_ERROR,
+                     message, msgID);
+          }
+          DbHandler dbHandler =
+            new DbHandler(serverId, baseDn, changelog, this);
+          changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler);
+        } catch (NumberFormatException e)
+        {
+          // should never happen
+          throw new ChangelogDBException(0,
+              "changelog state database has a wrong format");
+        } catch (UnsupportedEncodingException e)
+        {
+          // should never happens
+          throw new ChangelogDBException(0, "need UTF-8 support");
+        }
+        status = cursor.getNext(key, data, LockMode.DEFAULT);
+      }
+      cursor.close();
+
+    } catch (DatabaseException dbe) {
+      cursor.close();
+      throw dbe;
+    }
+  }
+
+  /**
+   * Find or create the database used to store changes from the server
+   * with the given serverId and the given baseDn.
+   * @param serverId The server id that identifies the server.
+   * @param baseDn The baseDn that identifies the server.
+   * @return the Database.
+   * @throws DatabaseException in case of underlying Exception.
+   */
+  public Database getOrAddDb(Short serverId, DN baseDn)
+                  throws DatabaseException
+  {
+    try
+    {
+      String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
+      byte[] byteId;
+
+      byteId = stringId.getBytes("UTF-8");
+
+      // Open the database. Create it if it does not already exist.
+      DatabaseConfig dbConfig = new DatabaseConfig();
+      dbConfig.setAllowCreate(true);
+      dbConfig.setTransactional(true);
+      Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
+
+      DatabaseEntry key = new DatabaseEntry();
+      key.setData(byteId);
+      DatabaseEntry data = new DatabaseEntry();
+      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+      if (status == OperationStatus.NOTFOUND)
+      {
+        Transaction txn = dbEnvironment.beginTransaction(null, null);
+        try {
+          data.setData(byteId);
+          stateDb.put(txn, key, data);
+          txn.commitWriteNoSync();
+        } catch (DatabaseException dbe)
+        {
+          // Abort the txn and propagate the Exception to the caller
+          txn.abort();
+          throw dbe;
+        }
+      }
+      return db;
+    } catch (UnsupportedEncodingException e)
+    {
+      // can't happen
+      return null;
+    }
+  }
+
+  /**
+   * Creates a new transaction.
+   *
+   * @return the transaction.
+   * @throws DatabaseException in case of underlying database Exception.
+   */
+  public Transaction beginTransaction() throws DatabaseException
+  {
+    return dbEnvironment.beginTransaction(null, null);
+  }
+
+  /**
+   * Shutdown the Db environment.
+   */
+  public void shutdown()
+  {
+    try
+    {
+      stateDb.close();
+      dbEnvironment.close();
+    } catch (DatabaseException e)
+    {
+      int    msgID   = MSGID_ERROR_CLOSING_CHANGELOG_ENV;
+      String message = getMessage(msgID) + stackTraceToSingleLineString(e);
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+               ErrorLogSeverity.SEVERE_ERROR,
+               message, msgID);
+    }
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/changelog/DbHandler.java b/opends/src/server/org/opends/server/changelog/DbHandler.java
index 45468b2..593b868 100644
--- a/opends/src/server/org/opends/server/changelog/DbHandler.java
+++ b/opends/src/server/org/opends/server/changelog/DbHandler.java
@@ -87,13 +87,17 @@
    *
    * @param id Identifier of the DB.
    * @param baseDn of the DB.
+   * @param changelog the Changelog that creates this dbHandler.
+   * @param dbenv the Database Env to use to create the Changelog DB.
    * @throws DatabaseException If a database problem happened
    */
-  public DbHandler(short id, DN baseDn) throws DatabaseException
+  public DbHandler(short id, DN baseDn, Changelog changelog,
+      ChangelogDbEnv dbenv)
+         throws DatabaseException
   {
     this.serverId = id;
     this.baseDn = baseDn;
-    db = new ChangelogDB(id, baseDn);
+    db = new ChangelogDB(id, baseDn, changelog, dbenv);
     firstChange = db.readFirstChange();
     lastChange = db.readLastChange();
     thread = new DirectoryThread(this, "changelog db " + id + " " +  baseDn);
@@ -245,6 +249,10 @@
         {}
       }
     }
+
+    while (msgQueue.size() != 0)
+      flush();
+
     db.shutdown();
   }
 
diff --git a/opends/src/server/org/opends/server/changelog/ServerHandler.java b/opends/src/server/org/opends/server/changelog/ServerHandler.java
index d4cc9d6..6934954 100644
--- a/opends/src/server/org/opends/server/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/changelog/ServerHandler.java
@@ -39,6 +39,7 @@
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
 
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigEntry;
@@ -58,6 +59,7 @@
 import org.opends.server.synchronization.ServerState;
 import org.opends.server.synchronization.SynchronizationMessage;
 import org.opends.server.synchronization.UpdateMessage;
+import org.opends.server.synchronization.WindowMessage;
 import org.opends.server.util.TimeThread;
 
 /**
@@ -71,7 +73,7 @@
   private MsgQueue msgQueue = new MsgQueue();
   private MsgQueue lateQueue = new MsgQueue();
   private Map<ChangeNumber, AckMessageList> waitingAcks  =
-          new HashMap<ChangeNumber, AckMessageList>();;
+          new HashMap<ChangeNumber, AckMessageList>();
   private ChangelogCache changelogCache = null;
   private String serverURL;
   private int outCount = 0; // number of update sent to the server
@@ -93,6 +95,12 @@
   private ServerWriter writer = null;
   private DN baseDn = null;
   private String serverAddressURL;
+  private int rcvWindow;
+  private int rcvWindowSizeHalf;
+  private int maxRcvWindow;
+  private ServerReader reader;
+  private Semaphore sendWindow;
+  private int sendWindowSize;
 
   private static Map<ChangeNumber, ChangelogAckMessageList>
    changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -116,20 +124,29 @@
    *
    * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
    *               null if this is an incoming connection.
+   * @param changelogId The identifier of the changelog that creates this
+   *                    server handler.
+   * @param changelogURL The URL of the changelog that creates this
+   *                    server handler.
+   * @param windowSize the window size that this server handler must use.
+   * @param changelog the Changelog that created this server handler.
    */
-  public void start(DN baseDn)
+  public void start(DN baseDn, short changelogId, String changelogURL,
+                    int windowSize, Changelog changelog)
   {
+    rcvWindowSizeHalf = windowSize/2;
+    maxRcvWindow = windowSize;
+    rcvWindow = windowSize;
     try
     {
       if (baseDn != null)
       {
         this.baseDn = baseDn;
-        changelogCache = Changelog.getChangelogCache(baseDn);
+        changelogCache = changelog.getChangelogCache(baseDn);
         ServerState localServerState = changelogCache.getDbServerState();
         ChangelogStartMessage msg =
-          new ChangelogStartMessage(Changelog.getServerId(),
-                                    Changelog.getServerURL(),
-                                    baseDn, localServerState);
+          new ChangelogStartMessage(changelogId, changelogURL,
+                                    baseDn, windowSize, localServerState);
 
         session.publish(msg);
       }
@@ -175,16 +192,15 @@
           restartSendDelay = 0;
         serverIsLDAPserver = true;
 
-        changelogCache = Changelog.getChangelogCache(this.baseDn);
+        changelogCache = changelog.getChangelogCache(this.baseDn);
         ServerState localServerState = changelogCache.getDbServerState();
         ChangelogStartMessage myStartMsg =
-          new ChangelogStartMessage(Changelog.getServerId(),
-                                    Changelog.getServerURL(),
-                                    this.baseDn, localServerState);
+          new ChangelogStartMessage(changelogId, changelogURL,
+                                    this.baseDn, windowSize, localServerState);
         session.publish(myStartMsg);
+        sendWindowSize = receivedMsg.getWindowSize();
       }
-      else if (msg.getClass() == Class.forName(
-      "org.opends.server.synchronization.ChangelogStartMessage"))
+      else if (msg instanceof ChangelogStartMessage)
       {
         ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
         serverId = receivedMsg.getServerId();
@@ -195,17 +211,17 @@
         this.baseDn = receivedMsg.getBaseDn();
         if (baseDn == null)
         {
-          changelogCache = Changelog.getChangelogCache(this.baseDn);
+          changelogCache = changelog.getChangelogCache(this.baseDn);
           ServerState serverState = changelogCache.getDbServerState();
           ChangelogStartMessage outMsg =
-            new ChangelogStartMessage(Changelog.getServerId(),
-                                      Changelog.getServerURL(),
-                                      this.baseDn, serverState);
+            new ChangelogStartMessage(changelogId, changelogURL,
+                                      this.baseDn, windowSize, serverState);
           session.publish(outMsg);
         }
         else
           this.baseDn = baseDn;
         this.serverState = receivedMsg.getServerState();
+        sendWindowSize = receivedMsg.getWindowSize();
       }
       else
       {
@@ -213,7 +229,7 @@
         return;   // we did not recognize the message, ignore it
       }
 
-      changelogCache = Changelog.getChangelogCache(this.baseDn);
+      changelogCache = changelog.getChangelogCache(this.baseDn);
 
       if (serverIsLDAPserver)
       {
@@ -226,7 +242,7 @@
 
       writer = new ServerWriter(session, serverId, this, changelogCache);
 
-      ServerReader reader = new ServerReader(session, serverId, this,
+      reader = new ServerReader(session, serverId, this,
                                              changelogCache);
 
       reader.start();
@@ -251,7 +267,7 @@
         // ignore
       }
     }
-
+    sendWindow = new Semaphore(sendWindowSize);
   }
 
   /**
@@ -576,6 +592,30 @@
    */
   public UpdateMessage take()
   {
+    boolean interrupted = true;
+    UpdateMessage msg = getnextMessage();
+    do {
+      try
+      {
+        sendWindow.acquire();
+        interrupted = false;
+      } catch (InterruptedException e)
+      {
+        // loop until not interrupted
+      }
+    } while (interrupted);
+    this.incrementOutCount();
+    return msg;
+  }
+
+  /**
+   * Get the next update that must be sent to the server
+   * from the message queue or from the database.
+   *
+   * @return The next update that must be sent to the server.
+   */
+  private UpdateMessage getnextMessage()
+  {
     UpdateMessage msg;
     do
     {
@@ -668,7 +708,6 @@
                   msg1 = msgQueue.removeFirst();
                 } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
                 this.updateServerState(msg);
-                this.incrementOutCount();
                 return msg;
               }
             }
@@ -679,7 +718,6 @@
           /* get the next change from the lateQueue */
           msg = lateQueue.removeFirst();
           this.updateServerState(msg);
-          this.incrementOutCount();
           return msg;
         }
       }
@@ -707,7 +745,6 @@
              * by the other server.
              * Otherwise just loop to select the next message.
              */
-            this.incrementOutCount();
             return msg;
           }
         }
@@ -927,7 +964,7 @@
   @Override
   public String getMonitorInstanceName()
   {
-    String str = changelogCache.getBaseDn().toString() +
+    String str = baseDn.toString() +
                  " " + serverURL + " " + String.valueOf(serverId);
 
     if (serverIsLDAPserver)
@@ -985,7 +1022,7 @@
     attributes.add(new Attribute("server-id",
                                  String.valueOf(serverId)));
     attributes.add(new Attribute("base-dn",
-                                 changelogCache.getBaseDn().toString()));
+                                 baseDn.toString()));
     attributes.add(new Attribute("waiting-changes",
                                  String.valueOf(getRcvMsgQueueSize())));
     attributes.add(new Attribute("update-waiting-acks",
@@ -999,6 +1036,14 @@
                                  String.valueOf(getInAckCount())));
     attributes.add(new Attribute("approximate-delay",
                                  String.valueOf(getApproxDelay())));
+    attributes.add(new Attribute("max-send-window",
+                                 String.valueOf(sendWindowSize)));
+    attributes.add(new Attribute("current-send-window",
+                                String.valueOf(sendWindow.availablePermits())));
+    attributes.add(new Attribute("max-rcv-window",
+                                 String.valueOf(maxRcvWindow)));
+    attributes.add(new Attribute("current-rcv-window",
+                                 String.valueOf(rcvWindow)));
     long olderUpdateTime = getOlderUpdateTime();
     if (olderUpdateTime != 0)
     {
@@ -1058,4 +1103,33 @@
 
     return localString;
   }
+
+  /**
+   * Check the protocol window and send WindowMessage if necessary.
+   *
+   * @throws IOException when the session becomes unavailable.
+   */
+  public synchronized void checkWindow() throws IOException
+  {
+    rcvWindow--;
+    if (rcvWindow < rcvWindowSizeHalf)
+    {
+      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+      session.publish(msg);
+      outAckCount++;
+      rcvWindow += rcvWindowSizeHalf;
+    }
+  }
+
+  /**
+   * Update the send window size based on the credit specified in the
+   * given window message.
+   *
+   * @param windowMsg The Window Message containing the information
+   *                  necessary for updating the window size.
+   */
+  public void updateWindow(WindowMessage windowMsg)
+  {
+    sendWindow.release(windowMsg.getNumAck());
+  }
 }
diff --git a/opends/src/server/org/opends/server/changelog/ServerReader.java b/opends/src/server/org/opends/server/changelog/ServerReader.java
index ab8a68e..d0f1126 100644
--- a/opends/src/server/org/opends/server/changelog/ServerReader.java
+++ b/opends/src/server/org/opends/server/changelog/ServerReader.java
@@ -36,6 +36,7 @@
 import org.opends.server.synchronization.AckMessage;
 import org.opends.server.synchronization.SynchronizationMessage;
 import org.opends.server.synchronization.UpdateMessage;
+import org.opends.server.synchronization.WindowMessage;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
 
@@ -105,8 +106,14 @@
         else if (msg instanceof UpdateMessage)
         {
           UpdateMessage update = (UpdateMessage) msg;
+          handler.checkWindow();
           changelogCache.put(update, handler);
         }
+        else if (msg instanceof WindowMessage)
+        {
+          WindowMessage windowMsg = (WindowMessage) msg;
+          handler.updateWindow(windowMsg);
+        }
       }
     } catch (IOException e)
     {
diff --git a/opends/src/server/org/opends/server/changelog/SocketSession.java b/opends/src/server/org/opends/server/changelog/SocketSession.java
index 4bb87d6..13708ad 100644
--- a/opends/src/server/org/opends/server/changelog/SocketSession.java
+++ b/opends/src/server/org/opends/server/changelog/SocketSession.java
@@ -60,6 +60,10 @@
   public SocketSession(Socket socket) throws IOException
   {
     this.socket = socket;
+    /*
+     * Use a window instead of the TCP flow control.
+     * Therefore set a very large value for send and receive buffer sizes.
+     */
     input = socket.getInputStream();
     output = socket.getOutputStream();
   }
diff --git a/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index c9a0605..2778e34 100644
--- a/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -34,6 +34,7 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -78,6 +79,12 @@
   private int maxReceiveDelay;
   private int maxSendQueue;
   private int maxReceiveQueue;
+  private Semaphore sendWindow;
+  private int maxSendWindow;
+  private int rcvWindow;
+  private int halfRcvWindow;
+  private int maxRcvWindow;
+  private int timeout = 0;
 
   /**
    * Creates a new Changelog Broker for a particular SynchronizationDomain.
@@ -95,10 +102,11 @@
    * @param maxSendQueue The maximum size of the send queue to use on
    *                     the changelog server.
    * @param maxSendDelay The maximum send delay to use on the changelog server.
+   * @param window The size of the send and receive window to use.
    */
   public ChangelogBroker(ServerState state, DN baseDn, short serverID,
       int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-      int maxSendDelay )
+      int maxSendDelay, int window)
   {
     this.baseDn = baseDn;
     this.serverID = serverID;
@@ -109,6 +117,9 @@
     this.state = state;
     replayOperations =
       new TreeSet<FakeOperation>(new FakeOperationComparator());
+    this.rcvWindow = window;
+    this.maxRcvWindow = window;
+    this.halfRcvWindow = window/2;
   }
 
   /**
@@ -165,6 +176,7 @@
           InetSocketAddress ServerAddr = new InetSocketAddress(
               InetAddress.getByName(hostname), Integer.parseInt(port));
           Socket socket = new Socket();
+          socket.setReceiveBufferSize(1000000);
           socket.connect(ServerAddr, 500);
           session = new SocketSession(socket);
 
@@ -173,7 +185,7 @@
            */
           ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
               maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              state);
+              halfRcvWindow*2, state);
           session.publish(msg);
 
 
@@ -182,7 +194,7 @@
            */
           session.setSoTimeout(1000);
           startMsg = (ChangelogStartMessage) session.receive();
-          session.setSoTimeout(0);
+          session.setSoTimeout(timeout);
 
           /*
            * We must not publish changes to a changelog that has not
@@ -202,6 +214,8 @@
               (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
           {
             changelogServer = ServerAddr.toString();
+            maxSendWindow = startMsg.getWindowSize();
+            this.sendWindow = new Semaphore(maxSendWindow);
             connected = true;
             break;
           }
@@ -254,6 +268,8 @@
               else
               {
                 changelogServer = ServerAddr.toString();
+                maxSendWindow = startMsg.getWindowSize();
+                this.sendWindow = new Semaphore(maxSendWindow);
                 connected = true;
                 for (FakeOperation replayOp : replayOperations)
                 {
@@ -306,6 +322,14 @@
            * changes that this server has already processed, start again
            * the loop looking for any changelog server.
            */
+          try
+          {
+            Thread.sleep(500);
+          } catch (InterruptedException e)
+          {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
           checkState = false;
           int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
           String message = getMessage(msgID);
@@ -393,13 +417,18 @@
         {
           if (this.connected == false)
             this.reStart(failingSession);
-
+          if (msg instanceof UpdateMessage)
+            sendWindow.acquire();
           session.publish(msg);
           done = true;
         } catch (IOException e)
         {
           this.reStart(failingSession);
         }
+        catch (InterruptedException e)
+        {
+          this.reStart(failingSession);
+        }
       }
     }
   }
@@ -418,7 +447,25 @@
       ProtocolSession failingSession = session;
       try
       {
-        return session.receive();
+        SynchronizationMessage msg = session.receive();
+        if (msg instanceof WindowMessage)
+        {
+          WindowMessage windowMsg = (WindowMessage) msg;
+          sendWindow.release(windowMsg.getNumAck());
+        }
+        else
+        {
+          if (msg instanceof UpdateMessage)
+          {
+            rcvWindow--;
+            if (rcvWindow < halfRcvWindow)
+            {
+              session.publish(new WindowMessage(halfRcvWindow));
+              rcvWindow += halfRcvWindow;
+            }
+          }
+          return msg;
+        }
       } catch (Exception e)
       {
         if (e instanceof SocketTimeoutException)
@@ -485,6 +532,7 @@
    */
   public void setSoTimeout(int timeout) throws SocketException
   {
+    this.timeout = timeout;
     session.setSoTimeout(timeout);
   }
 
@@ -532,4 +580,47 @@
   {
     // TODO to be implemented
   }
+
+  /**
+   * Get the maximum receive window size.
+   *
+   * @return The maximum receive window size.
+   */
+  public int getMaxRcvWindow()
+  {
+    return maxRcvWindow;
+  }
+
+  /**
+   * Get the current receive window size.
+   *
+   * @return The current receive window size.
+   */
+  public int getCurrentRcvWindow()
+  {
+    return rcvWindow;
+  }
+
+  /**
+   * Get the maximum send window size.
+   *
+   * @return The maximum send window size.
+   */
+  public int getMaxSendWindow()
+  {
+    return maxSendWindow;
+  }
+
+  /**
+   * Get the current send window size.
+   *
+   * @return The current send window size.
+   */
+  public int getCurrentSendWindow()
+  {
+    if (connected)
+      return sendWindow.availablePermits();
+    else
+      return 0;
+  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java b/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
index 5cd6738..2767579 100644
--- a/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
@@ -46,15 +46,19 @@
   private String serverURL;
   private ServerState serverState;
 
+  private int windowSize;
+
   /**
    * Create a ChangelogStartMessage.
    *
    * @param serverId changelog server id
    * @param serverURL changelog server URL
    * @param baseDn base DN for which the ChangelogStartMessage is created.
+   * @param windowSize The window size.
    * @param serverState our ServerState for this baseDn.
    */
   public ChangelogStartMessage(short serverId, String serverURL, DN baseDn,
+                               int windowSize,
                                ServerState serverState)
   {
     this.serverId = serverId;
@@ -63,6 +67,7 @@
       this.baseDn = baseDn.toNormalizedString();
     else
       this.baseDn = null;
+    this.windowSize = windowSize;
     this.serverState = serverState;
   }
 
@@ -76,7 +81,7 @@
   public ChangelogStartMessage(byte[] in) throws DataFormatException
   {
     /* The ChangelogStartMessage is encoded in the form :
-     * <baseDn><ServerId><ServerUrl><ServerState>
+     * <baseDn><ServerId><ServerUrl><windowsize><ServerState>
      */
     try
     {
@@ -108,6 +113,13 @@
       pos += length +1;
 
       /*
+       * read the window size
+       */
+      length = getNextLength(in, pos);
+      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      /*
       * read the ServerState
       */
       serverState = new ServerState(in, pos, in.length-1);
@@ -179,16 +191,18 @@
   public byte[] getBytes()
   {
     /* The ChangelogStartMessage is stored in the form :
-     * <operation type><basedn><serverid><serverURL><serverState>
+     * <operation type><basedn><serverid><serverURL><windowsize><serverState>
      */
     try {
       byte[] byteDn = baseDn.getBytes("UTF-8");
       byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
       byte[] byteServerUrl = serverURL.getBytes("UTF-8");
       byte[] byteServerState = serverState.getBytes();
+      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
 
       int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
-      byteServerUrl.length + 1 + byteServerState.length + 1;
+          byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+          byteServerState.length + 1;
 
       byte[] resultByteArray = new byte[length];
 
@@ -205,6 +219,9 @@
       /* put the ServerURL */
       pos = addByteArray(byteServerUrl, resultByteArray, pos);
 
+      /* put the window size */
+      pos = addByteArray(byteWindowSize, resultByteArray, pos);
+
       /* put the ServerState */
       pos = addByteArray(byteServerState, resultByteArray, pos);
 
@@ -215,4 +232,14 @@
       return null;
     }
   }
+
+  /**
+   * get the window size for the server that created this message.
+   *
+   * @return The window size for the server that created this message.
+   */
+  public int getWindowSize()
+  {
+    return windowSize;
+  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java b/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
index d354f23..7369a49 100644
--- a/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
+++ b/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
@@ -446,7 +446,7 @@
 
     // shutdown the Changelog Service if necessary
     if (changelog != null)
-      Changelog.shutdown();
+      changelog.shutdown();
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java b/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
index c9fa051..a93d6ee 100644
--- a/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
@@ -52,6 +52,7 @@
   private int maxSendQueue;
   private int maxReceiveDelay;
   private int maxSendDelay;
+  private int windowSize;
   private ServerState serverState = null;
 
   /**
@@ -64,11 +65,13 @@
    * @param maxReceiveQueue The max receive Queue for this server.
    * @param maxSendDelay The max Send Delay from this server.
    * @param maxSendQueue The max send Queue from this server.
+   * @param windowSize   The window size used by this server.
    * @param serverState  The state of this server.
    */
   public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                             int maxReceiveQueue, int maxSendDelay,
-                            int maxSendQueue, ServerState serverState)
+                            int maxSendQueue, int windowSize,
+                            ServerState serverState)
   {
     this.serverId = serverId;
     this.baseDn = baseDn.toString();
@@ -77,6 +80,7 @@
     this.maxSendDelay = maxSendDelay;
     this.maxSendQueue = maxSendQueue;
     this.serverState = serverState;
+    this.windowSize = windowSize;
 
     try
     {
@@ -100,7 +104,7 @@
   {
     /* The ServerStartMessage is encoded in the form :
      * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
-     * <maxSendDelay><maxSendQueue><ServerState>
+     * <maxSendDelay><maxSendQueue><window><ServerState>
      */
     try
     {
@@ -161,6 +165,13 @@
       pos += length +1;
 
       /*
+       * read the windowSize
+       */
+      length = getNextLength(in, pos);
+      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      /*
       * read the ServerState
       */
       serverState = new ServerState(in, pos, in.length-1);
@@ -269,7 +280,7 @@
     /*
      * ServerStartMessage contains.
      * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
-     * <maxSendDelay><maxSendQueue><ServerState>
+     * <maxSendDelay><maxSendQueue><windowsize><ServerState>
      */
     try {
       byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -283,6 +294,8 @@
                      String.valueOf(maxSendDelay).getBytes("UTF-8");
       byte[] byteMaxSendQueue =
                      String.valueOf(maxSendQueue).getBytes("UTF-8");
+      byte[] byteWindowSize =
+                     String.valueOf(windowSize).getBytes("UTF-8");
       byte[] byteServerState = serverState.getBytes();
 
       int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
@@ -291,6 +304,7 @@
                    byteMaxRecvQueue.length + 1 +
                    byteMaxSendDelay.length + 1 +
                    byteMaxSendQueue.length + 1 +
+                   byteWindowSize.length + 1 +
                    byteServerState.length + 1;
 
       byte[] resultByteArray = new byte[length];
@@ -313,6 +327,8 @@
 
       pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
 
+      pos = addByteArray(byteWindowSize, resultByteArray, pos);
+
       pos = addByteArray(byteServerState, resultByteArray, pos);
 
       return resultByteArray;
@@ -322,4 +338,14 @@
       return null;
     }
   }
+
+  /**
+   * Get the window size for the ldap server that created the message.
+   *
+   * @return The window size for the ldap server that created the message.
+   */
+  public int getWindowSize()
+  {
+    return windowSize;
+  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index 7b401b2..766359b 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -138,6 +138,7 @@
   static String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay";
   static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
   static String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
+  static String WINDOW_SIZE = "ds-cfg-window-size";
 
   private static final StringConfigAttribute changelogStub =
     new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
@@ -275,6 +276,20 @@
       configAttributes.add(maxSendDelayAttr);
     }
 
+    Integer window;
+    IntegerConfigAttribute windowStub =
+      new IntegerConfigAttribute(WINDOW_SIZE, "window size",
+                                 false, false, false, true, 0, false, 0);
+    IntegerConfigAttribute windowAttr =
+      (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub);
+    if (windowAttr == null)
+      window = 100;  // Attribute is not present : use the default value
+    else
+    {
+      window = windowAttr.activeIntValue();
+      configAttributes.add(windowAttr);
+    }
+
     configDn = configEntry.getDN();
     DirectoryServer.registerConfigurableComponent(this);
 
@@ -292,7 +307,7 @@
     try
     {
       broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
-          maxReceiveDelay, maxSendQueue, maxSendDelay);
+          maxReceiveDelay, maxSendQueue, maxSendDelay, window);
       synchronized (broker)
       {
         broker.start(changelogServers);
@@ -882,10 +897,7 @@
    */
   public int getPendingUpdatesCount()
   {
-    synchronized (pendingChanges)
-    {
-      return pendingChanges.size();
-    }
+    return pendingChanges.size();
   }
 
   /**
@@ -1662,4 +1674,44 @@
     }
     return true;
   }
+
+  /**
+   * Get the maximum receive window size.
+   *
+   * @return The maximum receive window size.
+   */
+  public int getMaxRcvWindow()
+  {
+    return broker.getMaxRcvWindow();
+  }
+
+  /**
+   * Get the current receive window size.
+   *
+   * @return The current receive window size.
+   */
+  public int getCurrentRcvWindow()
+  {
+    return broker.getCurrentRcvWindow();
+  }
+
+  /**
+   * Get the maximum send window size.
+   *
+   * @return The maximum send window size.
+   */
+  public int getMaxSendWindow()
+  {
+    return broker.getMaxSendWindow();
+  }
+
+  /**
+   * Get the current send window size.
+   *
+   * @return The current send window size.
+   */
+  public int getCurrentSendWindow()
+  {
+    return broker.getCurrentSendWindow();
+  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java b/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
index bc9e663..800bf39 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
@@ -46,6 +46,7 @@
   static final byte MSG_TYPE_ACK = 5;
   static final byte MSG_TYPE_SERVER_START = 6;
   static final byte MSG_TYPE_CHANGELOG_START = 7;
+  static final byte MSG_TYPE_WINDOW = 8;
 
   /**
    * Do the processing necessary when the message is received.
@@ -106,6 +107,9 @@
       case MSG_TYPE_CHANGELOG_START:
         msg = new ChangelogStartMessage(buffer);
       break;
+      case MSG_TYPE_WINDOW:
+        msg = new WindowMessage(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java b/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
index f25a05d..c59ecec 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
@@ -98,64 +98,37 @@
     attributes.add(attr);
 
     /* get number of received updates */
-    final String ATTR_UPDATE_RECVD = "received-updates";
-    AttributeType type =
-                    DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_RECVD);
-    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(type,
-                                  String.valueOf(domain.getNumRcvdUpdates())));
-    attr = new Attribute(type, "received-updates", values);
-    attributes.add(attr);
+    addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates());
 
     /* get number of updates sent */
-    final String ATTR_UPDATE_SENT = "sent-updates";
-    type =  DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_SENT);
-    values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(type,
-                                  String.valueOf(domain.getNumSentUpdates())));
-    attr = new Attribute(type, "sent-updates", values);
-    attributes.add(attr);
+    addMonitorData(attributes, "sent-updates", domain.getNumSentUpdates());
 
     /* get number of changes in the pending list */
-    final String ATTR_UPDATE_PENDING = "pending-updates";
-    type =  DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_PENDING);
-    values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(type,
-                              String.valueOf(domain.getPendingUpdatesCount())));
-    attr = new Attribute(type, "pending-updates", values);
-    attributes.add(attr);
+    addMonitorData(attributes, "pending-updates",
+                   domain.getPendingUpdatesCount());
 
     /* get number of changes replayed */
-    final String ATTR_REPLAYED_UPDATE = "replayed-updates";
-    type =  DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE);
-    values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(type,
-                              String.valueOf(domain.getNumProcessedUpdates())));
-    attr = new Attribute(type, ATTR_REPLAYED_UPDATE, values);
-    attributes.add(attr);
+    addMonitorData(attributes, "replayed-updates",
+                   domain.getNumProcessedUpdates());
 
     /* get number of changes successfully */
-    final String ATTR_REPLAYED_UPDATE_OK = "replayed-updates-ok";
-    type =  DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE_OK);
-    values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(type,
-                          String.valueOf(domain.getNumReplayedPostOpCalled())));
-    attr = new Attribute(type, ATTR_REPLAYED_UPDATE_OK, values);
-    attributes.add(attr);
+    addMonitorData(attributes, "replayed-updates-ok",
+                   domain.getNumReplayedPostOpCalled());
 
-    /* get debugCount */
-    final String DEBUG_COUNT = "debug-count";
-    type =  DirectoryServer.getDefaultAttributeType(DEBUG_COUNT);
-    values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(type,
-                          String.valueOf(domain.getDebugCount())));
-    attr = new Attribute(type, DEBUG_COUNT, values);
-    attributes.add(attr);
+    /* get window information */
+    addMonitorData(attributes, "max-rcv-window", domain.getMaxRcvWindow());
+    addMonitorData(attributes, "current-rcv-window",
+                               domain.getCurrentRcvWindow());
+    addMonitorData(attributes, "max-send-window",
+                               domain.getMaxSendWindow());
+    addMonitorData(attributes, "current-send-window",
+                               domain.getCurrentSendWindow());
 
     /* get the Server State */
     final String ATTR_SERVER_STATE = "server-state";
-    type =  DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
-    values = new LinkedHashSet<AttributeValue>();
+    AttributeType type =
+      DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
+    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
     for (String str : domain.getServerState().toStringSet())
     {
       values.add(new AttributeValue(type,str));
@@ -168,6 +141,27 @@
   }
 
   /**
+   * Add an attribute with an integer value to the list of monitoring
+   * attributes.
+   *
+   * @param attributes the list of monitoring attributes
+   * @param name the name of the attribute to add.
+   * @param value The integer value of he attribute to add.
+   */
+  private void addMonitorData(ArrayList<Attribute> attributes,
+       String name, int value)
+  {
+    Attribute attr;
+    AttributeType type;
+    LinkedHashSet<AttributeValue> values;
+    type =  DirectoryServer.getDefaultAttributeType(name);
+    values = new LinkedHashSet<AttributeValue>();
+    values.add(new AttributeValue(type, String.valueOf(value)));
+    attr = new Attribute(type, name, values);
+    attributes.add(attr);
+  }
+
+  /**
    * Retrieves the length of time in milliseconds that should elapse between
    * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
    * return value indicates that the <CODE>updateMonitorData()</CODE> method
diff --git a/opends/src/server/org/opends/server/synchronization/WindowMessage.java b/opends/src/server/org/opends/server/synchronization/WindowMessage.java
new file mode 100644
index 0000000..868bd12
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/WindowMessage.java
@@ -0,0 +1,141 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This message is used by LDAP server when they first connect.
+ * to a changelog server to let them know who they are and what is their state
+ * (their RUV)
+ */
+public class WindowMessage extends SynchronizationMessage implements
+    Serializable
+{
+  private static final long serialVersionUID = 8442267608764026867L;
+  private final int numAck;
+
+
+  /**
+   * Create a new WindowMessage.
+   *
+   * @param numAck The number of acknowledged messages.
+   *               The window will be increase by this number.
+   */
+  public WindowMessage(int numAck)
+  {
+    this.numAck = numAck;
+  }
+
+  /**
+   * Creates a new WindowMessage from its encoded form.
+   *
+   * @param in The byte array containing the encoded form of the
+   *           WindowMessage.
+   * @throws DataFormatException If the byte array does not contain a valid
+   *                             encoded form of the WindowMessage.
+   */
+  public WindowMessage(byte[] in) throws DataFormatException
+  {
+    /* The WindowMessage is encoded in the form :
+     * <numAck>
+     */
+    try
+    {
+      /* first byte is the type */
+      if (in[0] != MSG_TYPE_WINDOW)
+        throw new DataFormatException("input is not a valid Window Message");
+      int pos = 1;
+
+      /*
+       * read the number of acks contained in this message.
+       * first calculate the length then construct the string
+       */
+      int length = getNextLength(in, pos);
+      String numAckStr = new String(in, pos, length, "UTF-8");
+      pos += length +1;
+      numAck = Integer.parseInt(numAckStr);
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    /*
+     * WindowMessage contains.
+     * <numAck>
+     */
+    try {
+      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
+
+      int length = 1 + byteNumAck.length + 1;
+
+      byte[] resultByteArray = new byte[length];
+
+      /* put the type of the operation */
+      resultByteArray[0] = MSG_TYPE_WINDOW;
+      int pos = 1;
+
+      pos = addByteArray(byteNumAck, resultByteArray, pos);
+
+      return resultByteArray;
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      return null;
+    }
+  }
+
+
+  /**
+   * Get the number of message acknowledged by the Window Message.
+   *
+   * @return the number of message acknowledged by the Window Message.
+   */
+  public int getNumAck()
+  {
+    return numAck;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public UpdateMessage processReceive(SynchronizationDomain domain)
+  {
+    return null;
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 2494e27..c534159 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -70,6 +70,14 @@
        "org.opends.server.BuildRoot";
 
   /**
+   * The name of the system property that specifies the ldap port.
+   * Set this prtoperty when running the server if you want to use a given
+   * port number, otherwise a port is choosed randomly at test startup time.
+   */
+  public static final String PROPERTY_LDAP_PORT =
+       "org.opends.server.LdapPort";
+  
+  /**
    * The string representation of the DN that will be used as the base entry for
    * the test backend.  This must not be changed, as there are a number of test
    * cases that depend on this specific value of "o=test".
@@ -205,8 +213,17 @@
     ServerSocket serverJmxSocket   = null;
     ServerSocket serverLdapsSocket = null;
 
-    serverLdapSocket = bindFreePort();
-    serverLdapPort = serverLdapSocket.getLocalPort();
+    String ldapPort = System.getProperty(PROPERTY_LDAP_PORT);
+    if (ldapPort == null)
+    {
+      serverLdapSocket = bindFreePort();
+      serverLdapPort = serverLdapSocket.getLocalPort();
+    }
+    else
+    {
+      serverLdapPort = Integer.valueOf(ldapPort);
+      serverLdapSocket = bindPort(serverLdapPort);
+    }
 
     serverJmxSocket = bindFreePort();
     serverJmxPort = serverJmxSocket.getLocalPort();
@@ -263,6 +280,23 @@
   }
 
   /**
+   * Binds to the given socket port on the local host.
+   * @return the bounded Server socket.
+   *
+   * @throws IOException in case of underlying exception.
+   * @throws SocketException in case of underlying exception.
+   */
+  private static ServerSocket bindPort(int port)
+          throws IOException, SocketException
+  {
+    ServerSocket serverLdapSocket;
+    serverLdapSocket = new ServerSocket();
+    serverLdapSocket.setReuseAddress(true);
+    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port));
+    return serverLdapSocket;
+  }
+
+  /**
    * Find and binds to a free server socket port on the local host.
    * @return the bounded Server socket.
    *
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
new file mode 100644
index 0000000..e0a952e
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -0,0 +1,494 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization;
+
+import static org.opends.server.loggers.Error.logError;
+import static org.testng.Assert.*;
+
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.config.ConfigEntry;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.core.Operation;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPException;
+import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.OperationType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchScope;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the contructors, encoders and decoders of the synchronization AckMsg,
+ * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
+ */
+public class ProtocolWindowTest
+{
+  private static final int WINDOW_SIZE = 10;
+
+  private static final String SYNCHRONIZATION_STRESS_TEST =
+    "Synchronization Stress Test";
+
+  /**
+   * The internal connection used for operation
+   */
+  private InternalClientConnection connection;
+
+  /**
+   * Created entries that need to be deleted for cleanup
+   */
+  private ArrayList<Entry> entryList = new ArrayList<Entry>();
+
+  /**
+   * The Synchronization config manager entry
+   */
+  private String synchroStringDN;
+
+  /**
+   * The synchronization plugin entry
+   */
+  private String synchroPluginStringDN;
+
+  private Entry synchroPluginEntry;
+
+  /**
+   * The Server synchro entry
+   */
+  private String synchroServerStringDN;
+
+  private Entry synchroServerEntry;
+
+  /**
+   * The Change log entry
+   */
+  private String changeLogStringDN;
+
+  private Entry changeLogEntry;
+
+  /**
+   * A "person" entry
+   */
+  private Entry personEntry;
+
+  /**
+   * schema check flag
+   */
+  private boolean schemaCheck;
+
+  // WORKAROUND FOR BUG #639 - BEGIN -
+  /**
+   *
+   */
+  MultimasterSynchronization mms;
+
+  // WORKAROUND FOR BUG #639 - END -
+
+  /**
+   * Test the window mechanism by :
+   *  - creating a Changelog service client using the ChangelogBroker class.
+   *  - set a small window size.
+   *  - perform more than the window size operations.
+   *  - check that the Changelog has not sent more than window size operations.
+   *  - receive all messages from the ChangelogBroker, check that
+   *    the client receives the correct number of operations.
+   */
+  @Test(enabled=true, groups="slow")
+  public void saturateAndRestart() throws Exception
+  {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1);
+    
+    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+    cleanEntries();
+
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 13);
+
+    try {
+      
+      /* Test that changelog monitor and synchro plugin monitor informations
+       * publish the correct window size.
+       * This allows both the check the monitoring code and to test that
+       * configuration is working.
+       */
+      Thread.sleep(1500);
+      assertTrue(checkWindows(WINDOW_SIZE));
+      
+      // Create an Entry (add operation) that will be later used in the test.
+      Entry tmp = personEntry.duplicate();
+      AddOperation addOp = new AddOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, tmp.getDN(),
+          tmp.getObjectClasses(), tmp.getUserAttributes(),
+          tmp.getOperationalAttributes());
+      addOp.run();
+      entryList.add(personEntry);
+      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+        "The Add Entry operation failed");
+
+      // Check if the client has received the msg
+      SynchronizationMessage msg = broker.receive();
+      assertTrue(msg instanceof AddMsg,
+        "The received synchronization message is not an ADD msg");
+      AddMsg addMsg =  (AddMsg) msg;
+
+      Operation receivedOp = addMsg.createOperation(connection);
+      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+        "The received synchronization message is not an ADD msg");
+
+      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+        "The received ADD synchronization message is not for the excepted DN");
+
+      // send twice the window modify operations
+      int count = WINDOW_SIZE * 2;
+      processModify(count);
+
+      // let some time to the message to reach the changelog client
+      Thread.sleep(500);
+
+      // check that the changelog only sent WINDOW_SIZE messages
+      assertTrue(searchUpdateSent());
+
+      int rcvCount=0;
+      try
+      {
+        while (true)
+        {
+          broker.receive();
+          rcvCount++;
+        }
+      }
+      catch (SocketTimeoutException e)
+      {}
+      /*
+       * check that we received all updates
+       */
+      assertEquals(rcvCount, WINDOW_SIZE*2);
+    }
+    finally {
+      broker.stop();
+      DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+    }
+  }
+
+  /**
+   * Check that the window configuration has been successfull
+   * by reading the monitoring information and checking 
+   * that we do have 2 entries with the configured max-rcv-window.
+   */
+  private boolean checkWindows(int windowSize) throws LDAPException
+  {
+    InternalSearchOperation op = connection.processSearch(
+        new ASN1OctetString("cn=monitor"),
+        SearchScope.WHOLE_SUBTREE,
+        LDAPFilter.decode("(max-rcv-window=" + windowSize + ")"));
+    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+    return (op.getEntriesSent() == 3);
+  }
+
+  /**
+   * Search that the changelog has stopped sending changes after 
+   * having reach the limit of the window size.
+   * Do this by checking the monitoring information.
+   */
+  private boolean searchUpdateSent() throws Exception
+  {
+    InternalSearchOperation op = connection.processSearch(
+        new ASN1OctetString("cn=monitor"),
+        SearchScope.WHOLE_SUBTREE,
+        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
+    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+    return (op.getEntriesSent() == 1);
+  }
+
+  /**
+   * Set up the environment for performing the tests in this Class.
+   * synchronization
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @BeforeClass
+  public void setUp() throws Exception
+  {
+    // This test suite depends on having the schema available.
+    TestCaseUtils.startServer();
+
+    // Disable schema check
+    schemaCheck = DirectoryServer.checkSchema();
+    DirectoryServer.setCheckSchema(false);
+
+    // Create an internal connection
+    connection = new InternalClientConnection();
+
+    // Create backend top level entries
+    String[] topEntries = new String[2];
+    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
+        + "objectClass: domain\n";
+    topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+        + "objectClass: organizationalUnit\n"
+        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+    Entry entry;
+    for (int i = 0; i < topEntries.length; i++)
+    {
+      entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
+      AddOperation addOp = new AddOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+              .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+          entry.getUserAttributes(), entry.getOperationalAttributes());
+      addOp.setInternalOperation(true);
+      addOp.run();
+      entryList.add(entry);
+    }
+
+    // top level synchro provider
+    synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+    // Multimaster Synchro plugin
+    synchroPluginStringDN = "cn=Multimaster Synchronization, "
+        + synchroStringDN;
+    String synchroPluginLdif = "dn: "
+        + synchroPluginStringDN
+        + "\n"
+        + "objectClass: top\n"
+        + "objectClass: ds-cfg-synchronization-provider\n"
+        + "ds-cfg-synchronization-provider-enabled: true\n"
+        + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
+    synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
+
+    // Change log
+    changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+        + "objectClass: top\n"
+        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
+        + "ds-cfg-changelog-server-id: 1\n"
+        + "ds-cfg-window-size: " + WINDOW_SIZE;
+    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+
+    // suffix synchronized
+    synchroServerStringDN = "cn=example, " + synchroPluginStringDN;
+    String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
+        + "objectClass: top\n"
+        + "objectClass: ds-cfg-synchronization-provider-config\n"
+        + "cn: example\n"
+        + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
+        + "ds-cfg-changelog-server: localhost:8989\n"
+        + "ds-cfg-directory-server-id: 1\n"
+        + "ds-cfg-receive-status: true\n"
+        + "ds-cfg-window-size: " + WINDOW_SIZE;
+    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+
+    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
+        + "objectClass: top\n" + "objectClass: person\n"
+        + "objectClass: organizationalPerson\n"
+        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+        + "homePhone: 951-245-7634\n"
+        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+        + "mobile: 027-085-0537\n"
+        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
+        + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+        + "street: 17984 Thirteenth Street\n"
+        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+        + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+        + "userPassword: password\n" + "initials: AA\n";
+    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+    configureSynchronization();
+  }
+
+  /**
+   * Clean up the environment. return null;
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @AfterClass
+  public void classCleanUp() throws Exception
+  {
+    DirectoryServer.setCheckSchema(schemaCheck);
+
+    // WORKAROUND FOR BUG #639 - BEGIN -
+    DirectoryServer.deregisterSynchronizationProvider(mms);
+    mms.finalizeSynchronizationProvider();
+    // WORKAROUND FOR BUG #639 - END -
+
+    cleanEntries();
+  }
+
+  /**
+   * suppress all the entries created by the tests in this class
+   */
+  private void cleanEntries()
+  {
+    DeleteOperation op;
+    // Delete entries
+    Entry entries[] = entryList.toArray(new Entry[0]);
+    for (int i = entries.length - 1; i != 0; i--)
+    {
+      try
+      {
+        op = new DeleteOperation(connection, InternalClientConnection
+            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+            entries[i].getDN());
+        op.run();
+      } catch (Exception e)
+      {
+      }
+    }
+  }
+
+  /**
+   * @return
+   */
+  private List<Modification> generatemods(String attrName, String attrValue)
+  {
+    AttributeType attrType =
+      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
+    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+    values.add(new AttributeValue(attrType, attrValue));
+    Attribute attr = new Attribute(attrType, attrName, values);
+    List<Modification> mods = new ArrayList<Modification>();
+    Modification mod = new Modification(ModificationType.REPLACE, attr);
+    mods.add(mod);
+    return mods;
+  }
+
+  /**
+   * Open a changelog session to the local Changelog server.
+   *
+   */
+  private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
+          throws Exception, SocketException
+  {
+    ServerState state = new ServerState(baseDn);
+    state.loadState();
+    ChangelogBroker broker =
+      new ChangelogBroker(state, baseDn, serverId, 0, 0, 0, 0, WINDOW_SIZE);
+    ArrayList<String> servers = new ArrayList<String>(1);
+    servers.add("localhost:8989");
+    broker.start(servers);
+    broker.setSoTimeout(5000);
+    /*
+     * loop receiving update until there is nothing left
+     * to make sure that message from previous tests have been consumed.
+     */
+    try
+    {
+      while (true)
+      {
+        broker.receive();
+      }
+    }
+    catch (Exception e)
+    { }
+    return broker;
+  }
+
+  /**
+   * Configure the Synchronization for this test.
+   */
+  private void configureSynchronization() throws Exception
+  {
+    //
+    // Add the Multimaster synchronization plugin
+    DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
+    entryList.add(synchroPluginEntry);
+    assertNotNull(DirectoryServer.getConfigEntry(DN
+        .decode(synchroPluginStringDN)),
+        "Unable to add the Multimaster synchronization plugin");
+
+    // WORKAROUND FOR BUG #639 - BEGIN -
+    DN dn = DN.decode(synchroPluginStringDN);
+    ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
+    mms = new MultimasterSynchronization();
+    try
+    {
+      mms.initializeSynchronizationProvider(mmsConfigEntry);
+    }
+    catch (ConfigException e)
+    {
+      assertTrue(false,
+          "Unable to initialize the Multimaster synchronization plugin");
+    }
+    DirectoryServer.registerSynchronizationProvider(mms);
+    // WORKAROUND FOR BUG #639 - END -
+
+    //
+    // Add the changelog server
+    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
+    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
+        "Unable to add the changeLog server");
+    entryList.add(changeLogEntry);
+
+    //
+    // We also have a replicated suffix (synchronization domain)
+    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+        "Unable to add the syncrhonized server");
+    entryList.add(synchroServerEntry);
+  }
+
+  private void processModify(int count)
+  {
+    while (count>0)
+    {
+      count--;
+      // must generate the mods for every operation because they are modified
+      // by processModify.
+      List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+
+      ModifyOperation modOp =
+        connection.processModify(personEntry.getDN(), mods);
+      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
+    }
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
index d553104..f73ca4b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -27,6 +27,7 @@
 
 package org.opends.server.synchronization;
 
+import static org.opends.server.loggers.Error.logError;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -53,6 +54,8 @@
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.Modification;
 import org.opends.server.types.ModificationType;
@@ -139,96 +142,100 @@
   @Test(enabled=true, groups="slow")
   public void fromServertoBroker() throws Exception
   {
-
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting Synchronization StressTest : fromServertoBroker" , 1);
+    
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
     final int TOTAL_MESSAGES = 1000;
     cleanEntries();
 
-    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
     DirectoryServer.registerMonitorProvider(this);
 
     try {
-    /*
-     * loop receiving update until there is nothing left
-     * to make sure that message from previous tests have been consumed.
-     */
-    try
-    {
-      while (true)
+      /*
+       * loop receiving update until there is nothing left
+       * to make sure that message from previous tests have been consumed.
+       */
+      try
       {
-        broker.receive();
+        while (true)
+        {
+          broker.receive();
+        }
       }
-     }
-    catch (Exception e)
-    { }
-    /*
-     * Test that operations done on this server are sent to the
-     * changelog server and forwarded to our changelog broker session.
-     */
+      catch (Exception e)
+      { }
+      /*
+       * Test that operations done on this server are sent to the
+       * changelog server and forwarded to our changelog broker session.
+       */
 
-    // Create an Entry (add operation) that will be later used in the test.
-    Entry tmp = personEntry.duplicate();
-    AddOperation addOp = new AddOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-        .nextMessageID(), null, tmp.getDN(),
-        tmp.getObjectClasses(), tmp.getUserAttributes(),
-        tmp.getOperationalAttributes());
-    addOp.run();
-    entryList.add(personEntry);
-    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
-      "The Add Entry operation failed");
+      // Create an Entry (add operation) that will be later used in the test.
+      Entry tmp = personEntry.duplicate();
+      AddOperation addOp = new AddOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, tmp.getDN(),
+          tmp.getObjectClasses(), tmp.getUserAttributes(),
+          tmp.getOperationalAttributes());
+      addOp.run();
+      entryList.add(personEntry);
+      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+        "The Add Entry operation failed");
 
-    // Check if the client has received the msg
-    SynchronizationMessage msg = broker.receive();
-    assertTrue(msg instanceof AddMsg,
-      "The received synchronization message is not an ADD msg");
-    AddMsg addMsg =  (AddMsg) msg;
+      // Check if the client has received the msg
+      SynchronizationMessage msg = broker.receive();
+      assertTrue(msg instanceof AddMsg,
+        "The received synchronization message is not an ADD msg");
+      AddMsg addMsg =  (AddMsg) msg;
 
-    Operation receivedOp = addMsg.createOperation(connection);
-    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
-      "The received synchronization message is not an ADD msg");
+      Operation receivedOp = addMsg.createOperation(connection);
+      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+        "The received synchronization message is not an ADD msg");
 
-    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
-      "The received ADD synchronization message is not for the excepted DN");
+      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+        "The received ADD synchronization message is not for the excepted DN");
 
-    reader = new BrokerReader(broker);
-    reader.start();
+      reader = new BrokerReader(broker);
+      reader.start();
 
-    long startTime = TimeThread.getTime();
-    int count = TOTAL_MESSAGES;
+      long startTime = TimeThread.getTime();
+      int count = TOTAL_MESSAGES;
 
-    // Create a number of writer thread that will loop modifying the entry
-    List<Thread> writerThreadList = new LinkedList<Thread>();
-    for (int n = 0; n < 1; n++)
-    {
-      BrokerWriter writer = new BrokerWriter(count);
-      writerThreadList.add(writer);
-    }
-    for (Thread thread : writerThreadList)
-    {
-      thread.start();
-    }
-    // wait for all the threads to finish.
-    for (Thread thread : writerThreadList)
-    {
-      thread.join();
-    }
+      // Create a number of writer thread that will loop modifying the entry
+      List<Thread> writerThreadList = new LinkedList<Thread>();
+      for (int n = 0; n < 1; n++)
+      {
+        BrokerWriter writer = new BrokerWriter(count);
+        writerThreadList.add(writer);
+      }
+      for (Thread thread : writerThreadList)
+      {
+        thread.start();
+      }
+      // wait for all the threads to finish.
+      for (Thread thread : writerThreadList)
+      {
+        thread.join();
+      }
 
-    long afterSendTime = TimeThread.getTime();
+      long afterSendTime = TimeThread.getTime();
 
-    int rcvCount = reader.getCount();
-    long afterReceiveTime = TimeThread.getTime();
+      int rcvCount = reader.getCount();
+      
+      long afterReceiveTime = TimeThread.getTime();
 
-    if (rcvCount != TOTAL_MESSAGES)
-    {
-      fail("some messages were lost : expected : " +TOTAL_MESSAGES +
-           " received : " + rcvCount);
-    }
+      if (rcvCount != TOTAL_MESSAGES)
+      {
+        fail("some messages were lost : expected : " +TOTAL_MESSAGES +
+            " received : " + rcvCount);
+      }
 
     }
     finally {
-    DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
-    broker.stop();
+      DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+      broker.stop();
     }
   }
 
@@ -393,7 +400,7 @@
     ServerState state = new ServerState(baseDn);
     state.loadState();
     ChangelogBroker broker = new ChangelogBroker(state, baseDn,
-                                                 serverId, 0, 0, 0, 0);
+                                                 serverId, 0, 0, 0, 0, 100);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:8989");
     broker.start(servers);
@@ -441,7 +448,7 @@
     // We also have a replicated suffix (synchronization domain)
     DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
     assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
-        "Unable to add the syncrhonized server");
+        "Unable to add the synchronized server");
     entryList.add(synchroServerEntry);
   }
 
@@ -553,7 +560,9 @@
       {
         while (true)
         {
-          broker.receive();
+          SynchronizationMessage msg = broker.receive();
+          if (msg == null)
+            break;
           count ++;
         }
       } catch (Exception e) {
@@ -577,7 +586,7 @@
           return count;
         try
         {
-          this.wait();
+          this.wait(60);
           return count;
         } catch (InterruptedException e)
         {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
index 5df7ff9..8ccc759 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
@@ -438,6 +438,73 @@
     // Check that retrieved CN is OK
     msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes());
   }
+  
+  @DataProvider(name="serverStart")
+  public Object [][] createServerStartMessageTestData() throws Exception
+  {
+    DN baseDN = DN.decode("dc=example, dc=com");
+    ServerState state = new ServerState(baseDN);
+    return new Object [][] { {(short)1, baseDN, 100, state} };
+  }
+  /**
+   * Test that ServerStartMessage encoding and decoding works
+   * by checking that : msg == new ServerStartMessage(msg.getBytes()).
+   */
+  @Test(dataProvider="serverStart")
+  public void ServerStartMessageTest(short serverId, DN baseDN, int window,
+         ServerState state) throws Exception
+  {
+    state.update(new ChangeNumber((long)1, 1,(short)1));
+    ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
+        window, window, window, window, window, state);
+    ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
+    assertEquals(msg.getServerId(), newMsg.getServerId());
+    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
+        newMsg.getServerState().getMaxChangeNumber((short)1));
+  }
+  
+  @DataProvider(name="changelogStart")
+  public Object [][] createChangelogStartMessageTestData() throws Exception
+  {
+    DN baseDN = DN.decode("dc=example, dc=com");
+    ServerState state = new ServerState(baseDN);
+    return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} };
+  }
+  
+  /**
+   * Test that changelogStartMessage encoding and decoding works
+   * by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
+   */
+  @Test(dataProvider="changelogStart")
+  public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
+         String url, ServerState state) throws Exception
+  {
+    state.update(new ChangeNumber((long)1, 1,(short)1));
+    ChangelogStartMessage msg = new ChangelogStartMessage(serverId, 
+        url, baseDN, window, state);
+    ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
+    assertEquals(msg.getServerId(), newMsg.getServerId());
+    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
+        newMsg.getServerState().getMaxChangeNumber((short)1));
+  }
+  
+  /**
+   * Test that WindowMessageTest encoding and decoding works
+   * by checking that : msg == new WindowMessageTest(msg.getBytes()).
+   */
+  @Test()
+  public void WindowMessageTest() throws Exception
+  {
+    WindowMessage msg = new WindowMessage(123);
+    WindowMessage newMsg = new WindowMessage(msg.getBytes());
+    assertEquals(msg.getNumAck(), newMsg.getNumAck());
+  }
 
   /**
    * Test PendingChange
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 9345b82..245cf7b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -27,6 +27,7 @@
 
 package org.opends.server.synchronization;
 
+import static org.opends.server.loggers.Error.logError;
 import static org.testng.Assert.*;
 
 import java.net.SocketException;
@@ -325,6 +326,10 @@
   @Test(enabled=true)
   public void namingConflicts() throws Exception
   {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization test : namingConflicts" , 1);
+    
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
 
     /*
@@ -635,193 +640,198 @@
   @Test(enabled=true, dataProvider="assured")
   public void updateOperations(boolean assured) throws Exception
   {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization test : updateOperations " + assured , 1);
+    
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
 
     cleanEntries();
+    
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 27);
+    try {
+      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
 
-    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
-    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
-
-    /*
-     * loop receiving update until there is nothing left
-     * to make sure that message from previous tests have been consumed.
-     */
-    // broker.setSoTimeout(100);
-    try
-    {
-      while (true)
+      /*
+       * loop receiving update until there is nothing left
+       * to make sure that message from previous tests have been consumed.
+       */
+      try
       {
-        broker.receive();
+        while (true)
+        {
+          broker.receive();
+        }
       }
-     }
-    catch (Exception e)
-    {
-     // broker.setSoTimeout(1000);
-    }
-    /*
-     * Test that operations done on this server are sent to the
-     * changelog server and forwarded to our changelog broker session.
-     */
+      catch (Exception e)
+      {}
+      /*
+       * Test that operations done on this server are sent to the
+       * changelog server and forwarded to our changelog broker session.
+       */
 
-    // Create an Entry (add operation)
-    Entry tmp = personEntry.duplicate();
-    AddOperation addOp = new AddOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-        .nextMessageID(), null, tmp.getDN(),
-        tmp.getObjectClasses(), tmp.getUserAttributes(),
-        tmp.getOperationalAttributes());
-    addOp.run();
-    entryList.add(personEntry);
-    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+      // Create an Entry (add operation)
+      Entry tmp = personEntry.duplicate();
+      AddOperation addOp = new AddOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, tmp.getDN(),
+          tmp.getObjectClasses(), tmp.getUserAttributes(),
+          tmp.getOperationalAttributes());
+      addOp.run();
+      entryList.add(personEntry);
+      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
       "The Add Entry operation failed");
 
-    // Check if the client has received the msg
-    SynchronizationMessage msg = broker.receive();
-    assertTrue(msg instanceof AddMsg,
+      // Check if the client has received the msg
+      SynchronizationMessage msg = broker.receive();
+      assertTrue(msg instanceof AddMsg,
       "The received synchronization message is not an ADD msg");
-    AddMsg addMsg =  (AddMsg) msg;
+      AddMsg addMsg =  (AddMsg) msg;
 
-    Operation receivedOp = addMsg.createOperation(connection);
-    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+      Operation receivedOp = addMsg.createOperation(connection);
+      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
       "The received synchronization message is not an ADD msg");
 
-    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
       "The received ADD synchronization message is not for the excepted DN");
 
-    // Modify the entry
-    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+      // Modify the entry
+      List<Modification> mods = generatemods("telephonenumber", "01 02 45");
 
-    ModifyOperation modOp = new ModifyOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), mods);
-    modOp.setInternalOperation(true);
-    modOp.run();
+      ModifyOperation modOp = new ModifyOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, personEntry.getDN(), mods);
+      modOp.setInternalOperation(true);
+      modOp.run();
 
-    // See if the client has received the msg
-    msg = broker.receive();
-    assertTrue(msg instanceof ModifyMsg,
-        "The received synchronization message is not a MODIFY msg");
-    ModifyMsg modMsg = (ModifyMsg) msg;
+      // See if the client has received the msg
+      msg = broker.receive();
+      assertTrue(msg instanceof ModifyMsg,
+      "The received synchronization message is not a MODIFY msg");
+      ModifyMsg modMsg = (ModifyMsg) msg;
 
-    receivedOp = modMsg.createOperation(connection);
-    assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
-    "The received MODIFY synchronization message is not for the excepted DN");
+      receivedOp = modMsg.createOperation(connection);
+      assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+      "The received MODIFY synchronization message is not for the excepted DN");
 
-    // Modify the entry DN
-    DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
-    ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), RDN
-            .decode("uid=new person"), true, DN
-            .decode("ou=People,dc=example,dc=com"));
-    modDNOp.run();
-    assertNotNull(DirectoryServer.getEntry(newDN),
-        "The MOD_DN operation didn't create the new person entry");
-    assertNull(DirectoryServer.getEntry(personEntry.getDN()),
-        "The MOD_DN operation didn't delete the old person entry");
-    entryList.add(DirectoryServer.getEntry(newDN));
+      // Modify the entry DN
+      DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
+      ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, personEntry.getDN(), RDN
+          .decode("uid=new person"), true, DN
+          .decode("ou=People,dc=example,dc=com"));
+      modDNOp.run();
+      assertNotNull(DirectoryServer.getEntry(newDN),
+      "The MOD_DN operation didn't create the new person entry");
+      assertNull(DirectoryServer.getEntry(personEntry.getDN()),
+      "The MOD_DN operation didn't delete the old person entry");
+      entryList.add(DirectoryServer.getEntry(newDN));
 
-    // See if the client has received the msg
-    msg = broker.receive();
-    assertTrue(msg instanceof ModifyDNMsg,
-        "The received synchronization message is not a MODIFY DN msg");
-    ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
-    receivedOp = moddnMsg.createOperation(connection);
+      // See if the client has received the msg
+      msg = broker.receive();
+      assertTrue(msg instanceof ModifyDNMsg,
+      "The received synchronization message is not a MODIFY DN msg");
+      ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
+      receivedOp = moddnMsg.createOperation(connection);
 
-    assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
-        "The received MODIFY_DN message is not for the excepted DN");
+      assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+      "The received MODIFY_DN message is not for the excepted DN");
 
-    // Delete the entry
-    Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
-    DeleteOperation delOp = new DeleteOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, DN
-            .decode("uid= new person,ou=People,dc=example,dc=com"));
-    delOp.run();
-    assertNull(DirectoryServer.getEntry(newDN),
-        "Unable to delete the new person Entry");
-    entryList.remove(newPersonEntry);
+      // Delete the entry
+      Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
+      DeleteOperation delOp = new DeleteOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, DN
+          .decode("uid= new person,ou=People,dc=example,dc=com"));
+      delOp.run();
+      assertNull(DirectoryServer.getEntry(newDN),
+      "Unable to delete the new person Entry");
+      entryList.remove(newPersonEntry);
 
-    // See if the client has received the msg
-    msg = broker.receive();
-    assertTrue(msg instanceof DeleteMsg,
-        "The received synchronization message is not a MODIFY DN msg");
-    DeleteMsg delMsg = (DeleteMsg) msg;
-    receivedOp = delMsg.createOperation(connection);
-    assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
-        .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
-        "The received DELETE message is not for the excepted DN");
+      // See if the client has received the msg
+      msg = broker.receive();
+      assertTrue(msg instanceof DeleteMsg,
+      "The received synchronization message is not a MODIFY DN msg");
+      DeleteMsg delMsg = (DeleteMsg) msg;
+      receivedOp = delMsg.createOperation(connection);
+      assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
+          .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
+      "The received DELETE message is not for the excepted DN");
 
-    /*
-     * Now check that when we send message to the Changelog server
-     * and that they are received and correctly replayed by the server.
-     *
-     * Start by testing the Add message reception
-     */
-    addMsg = new AddMsg(gen.NewChangeNumber(),
-        personWithUUIDEntry.getDN().toString(),
-        user1entryUUID, baseUUID,
-        personWithUUIDEntry.getObjectClassAttribute(),
-        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
-    if (assured)
-      addMsg.setAssured();
-    broker.publish(addMsg);
+      /*
+       * Now check that when we send message to the Changelog server
+       * and that they are received and correctly replayed by the server.
+       *
+       * Start by testing the Add message reception
+       */
+      addMsg = new AddMsg(gen.NewChangeNumber(),
+          personWithUUIDEntry.getDN().toString(),
+          user1entryUUID, baseUUID,
+          personWithUUIDEntry.getObjectClassAttribute(),
+          personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+      if (assured)
+        addMsg.setAssured();
+      broker.publish(addMsg);
 
-    /*
-     * Check that the entry has been created in the local DS.
-     */
-    Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
-    assertNotNull(resultEntry,
-        "The send ADD synchronization message was not applied");
-    entryList.add(resultEntry);
+      /*
+       * Check that the entry has been created in the local DS.
+       */
+      Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
+      assertNotNull(resultEntry,
+      "The send ADD synchronization message was not applied");
+      entryList.add(resultEntry);
 
-    /*
-     * Test the reception of Modify Msg
-     */
-    modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
-                           mods, user1entryUUID);
-    if (assured)
-      modMsg.setAssured();
-    broker.publish(modMsg);
+      /*
+       * Test the reception of Modify Msg
+       */
+      modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
+          mods, user1entryUUID);
+      if (assured)
+        modMsg.setAssured();
+      broker.publish(modMsg);
 
-    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
-                           "telephonenumber", "01 02 45", 1000);
+      boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+          "telephonenumber", "01 02 45", 1000);
 
-    if (found == false)
-     fail("The modification has not been correctly replayed.");
+      if (found == false)
+        fail("The modification has not been correctly replayed.");
 
-    /*
-     * Test the Reception of Modify Dn Msg
-     */
-    moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
-                           gen.NewChangeNumber(),
-                           user1entryUUID, null,
-                           true, null, "uid= new person");
-    if (assured)
-      moddnMsg.setAssured();
-    broker.publish(moddnMsg);
+      /*
+       * Test the Reception of Modify Dn Msg
+       */
+      moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
+          gen.NewChangeNumber(),
+          user1entryUUID, null,
+          true, null, "uid= new person");
+      if (assured)
+        moddnMsg.setAssured();
+      broker.publish(moddnMsg);
 
-    resultEntry = getEntry(
-        DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true);
+      resultEntry = getEntry(
+          DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true);
 
-    assertNotNull(resultEntry,
-        "The modify DN synchronization message was not applied");
+      assertNotNull(resultEntry,
+      "The modify DN synchronization message was not applied");
 
-    /*
-     * Test the Reception of Delete Msg
-     */
-    delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
-                           gen.NewChangeNumber(), user1entryUUID);
-    if (assured)
-      delMsg.setAssured();
-    broker.publish(delMsg);
-    resultEntry = getEntry(
+      /*
+       * Test the Reception of Delete Msg
+       */
+      delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
+          gen.NewChangeNumber(), user1entryUUID);
+      if (assured)
+        delMsg.setAssured();
+      broker.publish(delMsg);
+      resultEntry = getEntry(
           DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, false);
 
-    assertNull(resultEntry,
-        "The DELETE synchronization message was not replayed");
-
-    broker.stop();
+      assertNull(resultEntry,
+      "The DELETE synchronization message was not replayed");
+    }
+    finally
+    {
+      broker.stop();
+    }
   }
 
   /**
@@ -850,7 +860,7 @@
     ServerState state = new ServerState(baseDn);
     state.loadState();
     ChangelogBroker broker = new ChangelogBroker(state, baseDn,
-                                                 serverId, 0, 0, 0, 0);
+                                                 serverId, 0, 0, 0, 0, 100);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:8989");
     broker.start(servers);
@@ -996,6 +1006,10 @@
   @Test(enabled=true)
   public void deleteNoSuchObject() throws Exception
   {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization test : deleteNoSuchObject" , 1);
+    
     DN dn = DN.decode("cn=No Such Object,ou=People,dc=example,dc=com");
     Operation op =
          new DeleteOperation(connection,
@@ -1014,12 +1028,17 @@
   @Test(enabled=true)
   public void infiniteReplayLoop() throws Exception
   {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization test : infiniteReplayLoop" , 1);
+    
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
 
-    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+    Thread.sleep(2000);
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 11);
     try
     {
-      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
+      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
 
       // Create a test entry.
       String personLdif = "dn: uid=user.2,ou=People,dc=example,dc=com\n"

--
Gitblit v1.10.0