mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
10.05.2006 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed
opends/resource/schema/02-config.ldif
@@ -547,6 +547,10 @@
  NAME 'ds-cfg-bind-with-dn-requires-password'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.288
  NAME 'ds-cfg-window-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.164
  NAME 'ds-cfg-max-receive-queue'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
@@ -1239,7 +1243,8 @@
  STRUCTURAL MUST ( ds-cfg-changelog-server $ ds-cfg-directory-server-id
  $ ds-cfg-synchronization-dn )
  MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $
  ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay )
  ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $
  ds-cfg-window-size )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.59
  NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator
@@ -1295,7 +1300,7 @@
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
  'ds-cfg-synchronization-changelog-server-config' SUP top
  STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
  MAY ( ds-cfg-changelog-server $ cn ) X-ORIGIN 'OpenDS Directory Server' )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
  SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
  X-ORIGIN 'OpenDS Directory Server' )
opends/src/server/org/opends/server/changelog/Changelog.java
@@ -71,14 +71,14 @@
 */
public class Changelog implements Runnable, ConfigurableComponent
{
  static private short serverId;
  static private String serverURL;
  private short serverId;
  private String serverURL;
  private static ServerSocket listenSocket;
  private static Thread myListenThread;
  private static Thread myConnectThread;
  private ServerSocket listenSocket;
  private Thread myListenThread;
  private Thread myConnectThread;
  private static boolean runListen = true;
  private boolean runListen = true;
  /* The list of changelog servers configured by the administrator */
  private List<String> changelogServers;
@@ -86,19 +86,22 @@
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private static HashMap<DN, ChangelogCache> baseDNs =
  private HashMap<DN, ChangelogCache> baseDNs =
          new HashMap<DN, ChangelogCache>();
  private String localURL = "null";
  private static boolean shutdown = false;
  private boolean shutdown = false;
  private short changelogServerId;
  private DN configDn;
  private List<ConfigAttribute> configAttributes =
          new ArrayList<ConfigAttribute>();
  private ChangelogDbEnv dbEnv;
  private int rcvWindow;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final IntegerConfigAttribute changelogPortStub =
    new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -114,6 +117,10 @@
        "changelog server information", true,
        true, false);
  static final IntegerConfigAttribute windowStub =
    new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
                               false, false, false, true, 0, false, 0);
  /**
   * Check if a ConfigEntry is valid.
   * @param config The config entry that needs to be checked.
@@ -229,6 +236,16 @@
    }
    configAttributes.add(changelogServer);
    IntegerConfigAttribute windowAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(windowStub);
    if (windowAttr == null)
      rcvWindow = 100;  // Attribute is not present : use the default value
    else
    {
      rcvWindow = windowAttr.activeIntValue();
      configAttributes.add(windowAttr);
    }
    initialize(changelogServerId, changelogPort);
    configDn = config.getDN();
@@ -305,9 +322,10 @@
      try
      {
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket));
        handler.start(null);
        handler.start(null, serverId, serverURL, rcvWindow, this);
      } catch (IOException e)
      {
        // ignore
@@ -378,11 +396,12 @@
      InetSocketAddress ServerAddr = new InetSocketAddress(
                     InetAddress.getByName(hostname), Integer.parseInt(port));
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.connect(ServerAddr, 500);
      ServerHandler handler = new ServerHandler(
                                      new SocketSession(socket));
      handler.start(baseDn);
      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
    }
    catch (IOException e)
    {
@@ -406,8 +425,9 @@
       * Initialize the changelog database.
       * TODO : the changelog db path should be configurable
       */
      ChangelogDB.initialize(DirectoryServer.getServerRoot() + File.separator
          + "changelogDb");
      dbEnv = new ChangelogDbEnv(
          DirectoryServer.getServerRoot() + File.separator + "changelogDb",
          this);
      /*
       * create changelog cache
@@ -421,7 +441,9 @@
      String localAdddress = InetAddress.getLocalHost().getHostAddress();
      serverURL = localhostname + ":" + String.valueOf(changelogPort);
      localURL = localAdddress + ":" + String.valueOf(changelogPort);
      listenSocket = new ServerSocket(changelogPort);
      listenSocket = new ServerSocket();
      listenSocket.setReceiveBufferSize(1000000);
      listenSocket.bind(new InetSocketAddress(changelogPort));
      /*
       * create working threads
@@ -460,32 +482,12 @@
  }
  /**
   * Retrieves the unique identifier for this changelog.
   *
   * @return  The unique identifier for this changelog.
   */
  public static short getServerId()
  {
    return serverId;
  }
  /**
   * Retrieves the host and port for this changelog, separated by a colon.
   *
   * @return  The host and port for this changelog, separated by a colon.
   */
  public static String getServerURL()
  {
    return serverURL;
  }
  /**
   * Get the ChangelogCache associated to the base DN given in parameter.
   *
   * @param baseDn The base Dn for which the ChangelogCache must be returned.
   * @return The ChangelogCache associated to the base DN given in parameter.
   */
  public static ChangelogCache getChangelogCache(DN baseDn)
  public ChangelogCache getChangelogCache(DN baseDn)
  {
    ChangelogCache changelogCache;
@@ -493,7 +495,7 @@
    {
      changelogCache = baseDNs.get(baseDn);
      if (changelogCache == null)
        changelogCache = new ChangelogCache(baseDn);
        changelogCache = new ChangelogCache(baseDn, this);
      baseDNs.put(baseDn, changelogCache);
    }
@@ -503,7 +505,7 @@
  /**
   * Shutdown the Changelog service and all its connections.
   */
  public static void shutdown()
  public void shutdown()
  {
    shutdown = true;
@@ -525,6 +527,22 @@
      changelogCache.shutdown();
    }
    ChangelogDB.shutdownDbEnvironment();
    dbEnv.shutdown();
  }
  /**
   * Creates a new DB handler for this Changelog and the serverId and
   * DN given in parameter.
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @return The new DB handler for this Changelog and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
   */
  public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
  {
    return new DbHandler(id, baseDn, this, dbEnv);
  }
}
opends/src/server/org/opends/server/changelog/ChangelogCache.java
@@ -100,15 +100,18 @@
   */
  private Map<Short, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Short, DbHandler>();
  private Changelog changelog;
  /**
   * Creates a new ChangelogCache associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ChangelogCache.
   * @param changelog the Changelog that created this changelog cache.
   */
  public ChangelogCache(DN baseDn)
  public ChangelogCache(DN baseDn, Changelog changelog)
  {
    this.baseDn = baseDn;
    this.changelog = changelog;
  }
  /**
@@ -161,7 +164,7 @@
      {
        try
        {
          dbHandler = new DbHandler(id, baseDn);
          dbHandler = changelog.newDbHandler(id, baseDn);
        } catch (DatabaseException e)
        {
          /*
@@ -169,14 +172,13 @@
           * from at least one LDAP server.
           * This changelog therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           * TODO : log error
           */
          int    msgID   = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
          String message = getMessage(msgID) + stackTraceToSingleLineString(e);
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          Changelog.shutdown();
          changelog.shutdown();
          return;
        }
        sourceDbHandlers.put(id, dbHandler);
@@ -411,14 +413,15 @@
  /**
   * creates a new ChangelogDB with specified identifier.
   * @param id the identifier of the new ChangelogDB.
   * @param baseDn the baseDn of the new ChangelogDB.
   * @param db the new db.
   *
   * @throws DatabaseException If a database error happened.
   */
  public void newDb(short id, DN baseDn) throws DatabaseException
  public void newDb(short id, DbHandler db) throws DatabaseException
  {
    synchronized (sourceDbHandlers)
    {
      sourceDbHandlers.put(id , new DbHandler(id, baseDn));
      sourceDbHandlers.put(id , db);
    }
  }
@@ -441,6 +444,16 @@
   */
  public void ack(AckMessage message, short fromServerId)
  {
    /*
     * there are 2 possible cases here :
     *  - the message that was acked comes from a server to which
     *    we are directly connected.
     *    In this case, we can find the handler from the connectedServers map
     *  - the message that was acked comes from a server to which we are not
     *    connected.
     *    In this case we need to find the changelog server that forwarded
     *    the change and send back the ack to this server.
     */
    ServerHandler handler = connectedServers.get(
                                       message.getChangeNumber().getServerId());
    if (handler != null)
@@ -551,6 +564,4 @@
  {
    return "ChangelogCache " + baseDn;
  }
}
opends/src/server/org/opends/server/changelog/ChangelogDB.java
@@ -32,10 +32,8 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
import java.io.File;
import java.io.UnsupportedEncodingException;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -45,10 +43,7 @@
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -60,143 +55,34 @@
 */
public class ChangelogDB
{
  private static Environment dbEnvironment = null;
  private Database db = null;
  private static Database stateDb = null;
  private String stringId = null;
  private ChangelogDbEnv dbenv = null;
  private Changelog changelog;
  private Short serverId;
  private DN baseDn;
  /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
   * @param serverId Identifier of the LDAP server.
   * @param baseDn baseDn of the LDAP server.
   * @param changelog the Changelog that needs to be shutdown
   * @param dbenv the Db encironemnet to use to create the db
   * @throws DatabaseException if a database problem happened
   */
  public ChangelogDB(Short serverId, DN baseDn)
  public ChangelogDB(Short serverId, DN baseDn, Changelog changelog,
                     ChangelogDbEnv dbenv)
                     throws DatabaseException
  {
    try {
      stringId = serverId.toString() + " " + baseDn.toNormalizedString();
      byte[] byteId = stringId.getBytes("UTF-8");
    this.serverId = serverId;
    this.baseDn = baseDn;
    this.dbenv = dbenv;
    this.changelog = changelog;
    db = dbenv.getOrAddDb(serverId, baseDn);
      // Open the database. Create it if it does not already exist.
      DatabaseConfig dbConfig = new DatabaseConfig();
      dbConfig.setAllowCreate(true);
      dbConfig.setTransactional(true);
      db = dbEnvironment.openDatabase(null, stringId, dbConfig);
      DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status == OperationStatus.NOTFOUND)
      {
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try {
          data.setData(byteId);
          stateDb.put(txn, key, data);
          txn.commitWriteNoSync();
        } catch (DatabaseException dbe)
        {
          // Abort the txn and propagate the Exception to the caller
          txn.abort();
          throw dbe;
        }
      }
    }
    catch (UnsupportedEncodingException e)
    {
      // never happens
    }
  }
  /**
   * Initialize this class.
   * Creates Db environment that will be used to create databases.
   * It also reads the currently known databases from the "changelogstate"
   * database.
   * @param path Path where the backing files must be created.
   * @throws DatabaseException If a DatabaseException occured that prevented
   *                           the initialization to happen.
   * @throws ChangelogDBException If a changelog internal error caused
   *                              a failure of the changelog processing.
   */
  public static void initialize(String path) throws DatabaseException,
                                                    ChangelogDBException
  {
    EnvironmentConfig envConfig = new EnvironmentConfig();
    /* Create the DB Environment that will be used for all
     * the Changelog activities related to the db
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam("je.cleaner.expunge", "true");
    // TODO : the DB cache size should be configurable
    // For now set 5M is OK for being efficient in 64M total for the JVM
    envConfig.setConfigParam("je.maxMemory", "5000000");
    dbEnvironment = new Environment(new File(path), envConfig);
    /*
     * One database is created to store the update from each LDAP
     * server in the topology.
     * The database "changelogstate" is used to store the list of all
     * the servers that have been seen in the past.
     */
    DatabaseConfig dbConfig = new DatabaseConfig();
    dbConfig.setAllowCreate(true);
    dbConfig.setTransactional(true);
    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
    Cursor cursor = stateDb.openCursor(null, null);
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    try
    {
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        try
        {
          String stringData = new String(data.getData(), "UTF-8");
          String[] str = stringData.split(" ", 2);
          short serverId = new Short(str[0]);
          DN baseDn = null;
          try
          {
            baseDn = DN.decode(str[1]);
          } catch (DirectoryException e)
          {
            int    msgID   = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER;
            String message = getMessage(msgID, str[1]);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                     ErrorLogSeverity.SEVERE_ERROR,
                     message, msgID);
          }
          Changelog.getChangelogCache(baseDn).newDb(serverId, baseDn);
        } catch (NumberFormatException e)
        {
          // should never happen
          throw new ChangelogDBException(0,
              "changelog state database has a wrong format");
        } catch (UnsupportedEncodingException e)
        {
          // should never happens
          throw new ChangelogDBException(0, "need UTF-8 support");
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      cursor.close();
    } catch (DatabaseException dbe) {
      cursor.close();
      throw dbe;
    }
  }
  /**
   * add a list of changes to the underlying db.
   *
   * @param changes The list of changes to add to the underlying db.
@@ -207,7 +93,7 @@
    try
    {
      txn = dbEnvironment.beginTransaction(null, null);
      txn = dbenv.beginTransaction();
      for (UpdateMessage change : changes)
      {
@@ -224,7 +110,7 @@
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          Changelog.shutdown();
          changelog.shutdown();
        }
      }
@@ -238,7 +124,7 @@
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      Changelog.shutdown();
      changelog.shutdown();
      if (txn != null)
      {
        try
@@ -264,7 +150,7 @@
    } catch (DatabaseException e)
    {
      int    msgID   = MSGID_EXCEPTION_CLOSING_DATABASE;
      String message = getMessage(msgID, stringId)  +
      String message = getMessage(msgID, this.toString())  +
                                 stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.NOTICE,
@@ -355,7 +241,7 @@
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      Changelog.shutdown();
      changelog.shutdown();
      return null;
    }
  }
@@ -396,28 +282,18 @@
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      Changelog.shutdown();
      changelog.shutdown();
      return null;
    }
  }
  /**
   * Shutdown the Db environment.
   * {@inheritDoc}
   */
  public static void shutdownDbEnvironment()
  @Override
  public String toString()
  {
    try
    {
      stateDb.close();
      dbEnvironment.close();
    } catch (DatabaseException e)
    {
      int    msgID   = MSGID_ERROR_CLOSING_CHANGELOG_ENV;
      String message = getMessage(msgID) + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    }
    return serverId.toString() + baseDn.toString();
  }
  /**
@@ -455,7 +331,7 @@
    private ChangelogCursor() throws DatabaseException
    {
      txn = dbEnvironment.beginTransaction(null, null);
      txn = dbenv.beginTransaction();
      cursor = db.openCursor(txn, null);
    }
@@ -477,7 +353,7 @@
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        Changelog.shutdown();
        changelog.shutdown();
      }
      if (txn != null)
      {
@@ -491,7 +367,7 @@
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          Changelog.shutdown();
          changelog.shutdown();
        }
      }
    }
opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java
New file
@@ -0,0 +1,245 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.UnsupportedEncodingException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
/**
 * This class is used to represent a Db environement that can be used
 * to create ChangelogDB.
 */
public class ChangelogDbEnv
{
  private Environment dbEnvironment = null;
  private Database stateDb = null;
  private Changelog changelog = null;
  /**
   * Initialize this class.
   * Creates Db environment that will be used to create databases.
   * It also reads the currently known databases from the "changelogstate"
   * database.
   * @param path Path where the backing files must be created.
   * @param changelog the Changelog that creates this ChangelogDbEnv.
   * @throws DatabaseException If a DatabaseException occured that prevented
   *                           the initialization to happen.
   * @throws ChangelogDBException If a changelog internal error caused
   *                              a failure of the changelog processing.
   */
  public ChangelogDbEnv(String path, Changelog changelog)
         throws DatabaseException, ChangelogDBException
  {
    this.changelog = changelog;
    EnvironmentConfig envConfig = new EnvironmentConfig();
    /* Create the DB Environment that will be used for all
     * the Changelog activities related to the db
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam("je.cleaner.expunge", "true");
    // TODO : the DB cache size should be configurable
    // For now set 5M is OK for being efficient in 64M total for the JVM
    envConfig.setConfigParam("je.maxMemory", "5000000");
    dbEnvironment = new Environment(new File(path), envConfig);
    /*
     * One database is created to store the update from each LDAP
     * server in the topology.
     * The database "changelogstate" is used to store the list of all
     * the servers that have been seen in the past.
     */
    DatabaseConfig dbConfig = new DatabaseConfig();
    dbConfig.setAllowCreate(true);
    dbConfig.setTransactional(true);
    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
    start();
  }
  /**
   * Read the list of known servers from the database and start dbHandler
   * for each of them.
   *
   * @throws DatabaseException in case of underlying DatabaseException
   * @throws ChangelogDBException when the information from the database
   *                              cannot be decoded correctly.
   */
  private void start() throws DatabaseException, ChangelogDBException
  {
    Cursor cursor = stateDb.openCursor(null, null);
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    try
    {
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        try
        {
          String stringData = new String(data.getData(), "UTF-8");
          String[] str = stringData.split(" ", 2);
          short serverId = new Short(str[0]);
          DN baseDn = null;
          try
          {
            baseDn = DN.decode(str[1]);
          } catch (DirectoryException e)
          {
            int    msgID   = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER;
            String message = getMessage(msgID, str[1]);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                     ErrorLogSeverity.SEVERE_ERROR,
                     message, msgID);
          }
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, changelog, this);
          changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler);
        } catch (NumberFormatException e)
        {
          // should never happen
          throw new ChangelogDBException(0,
              "changelog state database has a wrong format");
        } catch (UnsupportedEncodingException e)
        {
          // should never happens
          throw new ChangelogDBException(0, "need UTF-8 support");
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      cursor.close();
    } catch (DatabaseException dbe) {
      cursor.close();
      throw dbe;
    }
  }
  /**
   * Find or create the database used to store changes from the server
   * with the given serverId and the given baseDn.
   * @param serverId The server id that identifies the server.
   * @param baseDn The baseDn that identifies the server.
   * @return the Database.
   * @throws DatabaseException in case of underlying Exception.
   */
  public Database getOrAddDb(Short serverId, DN baseDn)
                  throws DatabaseException
  {
    try
    {
      String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
      byte[] byteId;
      byteId = stringId.getBytes("UTF-8");
      // Open the database. Create it if it does not already exist.
      DatabaseConfig dbConfig = new DatabaseConfig();
      dbConfig.setAllowCreate(true);
      dbConfig.setTransactional(true);
      Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
      DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status == OperationStatus.NOTFOUND)
      {
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try {
          data.setData(byteId);
          stateDb.put(txn, key, data);
          txn.commitWriteNoSync();
        } catch (DatabaseException dbe)
        {
          // Abort the txn and propagate the Exception to the caller
          txn.abort();
          throw dbe;
        }
      }
      return db;
    } catch (UnsupportedEncodingException e)
    {
      // can't happen
      return null;
    }
  }
  /**
   * Creates a new transaction.
   *
   * @return the transaction.
   * @throws DatabaseException in case of underlying database Exception.
   */
  public Transaction beginTransaction() throws DatabaseException
  {
    return dbEnvironment.beginTransaction(null, null);
  }
  /**
   * Shutdown the Db environment.
   */
  public void shutdown()
  {
    try
    {
      stateDb.close();
      dbEnvironment.close();
    } catch (DatabaseException e)
    {
      int    msgID   = MSGID_ERROR_CLOSING_CHANGELOG_ENV;
      String message = getMessage(msgID) + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    }
  }
}
opends/src/server/org/opends/server/changelog/DbHandler.java
@@ -87,13 +87,17 @@
   *
   * @param id Identifier of the DB.
   * @param baseDn of the DB.
   * @param changelog the Changelog that creates this dbHandler.
   * @param dbenv the Database Env to use to create the Changelog DB.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn) throws DatabaseException
  public DbHandler(short id, DN baseDn, Changelog changelog,
      ChangelogDbEnv dbenv)
         throws DatabaseException
  {
    this.serverId = id;
    this.baseDn = baseDn;
    db = new ChangelogDB(id, baseDn);
    db = new ChangelogDB(id, baseDn, changelog, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
    thread = new DirectoryThread(this, "changelog db " + id + " " +  baseDn);
@@ -245,6 +249,10 @@
        {}
      }
    }
    while (msgQueue.size() != 0)
      flush();
    db.shutdown();
  }
opends/src/server/org/opends/server/changelog/ServerHandler.java
@@ -39,6 +39,7 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
@@ -58,6 +59,7 @@
import org.opends.server.synchronization.ServerState;
import org.opends.server.synchronization.SynchronizationMessage;
import org.opends.server.synchronization.UpdateMessage;
import org.opends.server.synchronization.WindowMessage;
import org.opends.server.util.TimeThread;
/**
@@ -71,7 +73,7 @@
  private MsgQueue msgQueue = new MsgQueue();
  private MsgQueue lateQueue = new MsgQueue();
  private Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();;
          new HashMap<ChangeNumber, AckMessageList>();
  private ChangelogCache changelogCache = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
@@ -93,6 +95,12 @@
  private ServerWriter writer = null;
  private DN baseDn = null;
  private String serverAddressURL;
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
  private ServerReader reader;
  private Semaphore sendWindow;
  private int sendWindowSize;
  private static Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -116,20 +124,29 @@
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection.
   * @param changelogId The identifier of the changelog that creates this
   *                    server handler.
   * @param changelogURL The URL of the changelog that creates this
   *                    server handler.
   * @param windowSize the window size that this server handler must use.
   * @param changelog the Changelog that created this server handler.
   */
  public void start(DN baseDn)
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  {
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    try
    {
      if (baseDn != null)
      {
        this.baseDn = baseDn;
        changelogCache = Changelog.getChangelogCache(baseDn);
        changelogCache = changelog.getChangelogCache(baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage msg =
          new ChangelogStartMessage(Changelog.getServerId(),
                                    Changelog.getServerURL(),
                                    baseDn, localServerState);
          new ChangelogStartMessage(changelogId, changelogURL,
                                    baseDn, windowSize, localServerState);
        session.publish(msg);
      }
@@ -175,16 +192,15 @@
          restartSendDelay = 0;
        serverIsLDAPserver = true;
        changelogCache = Changelog.getChangelogCache(this.baseDn);
        changelogCache = changelog.getChangelogCache(this.baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage myStartMsg =
          new ChangelogStartMessage(Changelog.getServerId(),
                                    Changelog.getServerURL(),
                                    this.baseDn, localServerState);
          new ChangelogStartMessage(changelogId, changelogURL,
                                    this.baseDn, windowSize, localServerState);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else if (msg.getClass() == Class.forName(
      "org.opends.server.synchronization.ChangelogStartMessage"))
      else if (msg instanceof ChangelogStartMessage)
      {
        ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
        serverId = receivedMsg.getServerId();
@@ -195,17 +211,17 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          changelogCache = Changelog.getChangelogCache(this.baseDn);
          changelogCache = changelog.getChangelogCache(this.baseDn);
          ServerState serverState = changelogCache.getDbServerState();
          ChangelogStartMessage outMsg =
            new ChangelogStartMessage(Changelog.getServerId(),
                                      Changelog.getServerURL(),
                                      this.baseDn, serverState);
            new ChangelogStartMessage(changelogId, changelogURL,
                                      this.baseDn, windowSize, serverState);
          session.publish(outMsg);
        }
        else
          this.baseDn = baseDn;
        this.serverState = receivedMsg.getServerState();
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else
      {
@@ -213,7 +229,7 @@
        return;   // we did not recognize the message, ignore it
      }
      changelogCache = Changelog.getChangelogCache(this.baseDn);
      changelogCache = changelog.getChangelogCache(this.baseDn);
      if (serverIsLDAPserver)
      {
@@ -226,7 +242,7 @@
      writer = new ServerWriter(session, serverId, this, changelogCache);
      ServerReader reader = new ServerReader(session, serverId, this,
      reader = new ServerReader(session, serverId, this,
                                             changelogCache);
      reader.start();
@@ -251,7 +267,7 @@
        // ignore
      }
    }
    sendWindow = new Semaphore(sendWindowSize);
  }
  /**
@@ -576,6 +592,30 @@
   */
  public UpdateMessage take()
  {
    boolean interrupted = true;
    UpdateMessage msg = getnextMessage();
    do {
      try
      {
        sendWindow.acquire();
        interrupted = false;
      } catch (InterruptedException e)
      {
        // loop until not interrupted
      }
    } while (interrupted);
    this.incrementOutCount();
    return msg;
  }
  /**
   * Get the next update that must be sent to the server
   * from the message queue or from the database.
   *
   * @return The next update that must be sent to the server.
   */
  private UpdateMessage getnextMessage()
  {
    UpdateMessage msg;
    do
    {
@@ -668,7 +708,6 @@
                  msg1 = msgQueue.removeFirst();
                } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
                this.updateServerState(msg);
                this.incrementOutCount();
                return msg;
              }
            }
@@ -679,7 +718,6 @@
          /* get the next change from the lateQueue */
          msg = lateQueue.removeFirst();
          this.updateServerState(msg);
          this.incrementOutCount();
          return msg;
        }
      }
@@ -707,7 +745,6 @@
             * by the other server.
             * Otherwise just loop to select the next message.
             */
            this.incrementOutCount();
            return msg;
          }
        }
@@ -927,7 +964,7 @@
  @Override
  public String getMonitorInstanceName()
  {
    String str = changelogCache.getBaseDn().toString() +
    String str = baseDn.toString() +
                 " " + serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
@@ -985,7 +1022,7 @@
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
                                 changelogCache.getBaseDn().toString()));
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("update-waiting-acks",
@@ -999,6 +1036,14 @@
                                 String.valueOf(getInAckCount())));
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    attributes.add(new Attribute("max-send-window",
                                 String.valueOf(sendWindowSize)));
    attributes.add(new Attribute("current-send-window",
                                String.valueOf(sendWindow.availablePermits())));
    attributes.add(new Attribute("max-rcv-window",
                                 String.valueOf(maxRcvWindow)));
    attributes.add(new Attribute("current-rcv-window",
                                 String.valueOf(rcvWindow)));
    long olderUpdateTime = getOlderUpdateTime();
    if (olderUpdateTime != 0)
    {
@@ -1058,4 +1103,33 @@
    return localString;
  }
  /**
   * Check the protocol window and send WindowMessage if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    rcvWindow--;
    if (rcvWindow < rcvWindowSizeHalf)
    {
      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
      session.publish(msg);
      outAckCount++;
      rcvWindow += rcvWindowSizeHalf;
    }
  }
  /**
   * Update the send window size based on the credit specified in the
   * given window message.
   *
   * @param windowMsg The Window Message containing the information
   *                  necessary for updating the window size.
   */
  public void updateWindow(WindowMessage windowMsg)
  {
    sendWindow.release(windowMsg.getNumAck());
  }
}
opends/src/server/org/opends/server/changelog/ServerReader.java
@@ -36,6 +36,7 @@
import org.opends.server.synchronization.AckMessage;
import org.opends.server.synchronization.SynchronizationMessage;
import org.opends.server.synchronization.UpdateMessage;
import org.opends.server.synchronization.WindowMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -105,8 +106,14 @@
        else if (msg instanceof UpdateMessage)
        {
          UpdateMessage update = (UpdateMessage) msg;
          handler.checkWindow();
          changelogCache.put(update, handler);
        }
        else if (msg instanceof WindowMessage)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          handler.updateWindow(windowMsg);
        }
      }
    } catch (IOException e)
    {
opends/src/server/org/opends/server/changelog/SocketSession.java
@@ -60,6 +60,10 @@
  public SocketSession(Socket socket) throws IOException
  {
    this.socket = socket;
    /*
     * Use a window instead of the TCP flow control.
     * Therefore set a very large value for send and receive buffer sizes.
     */
    input = socket.getInputStream();
    output = socket.getOutputStream();
  }
opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -78,6 +79,12 @@
  private int maxReceiveDelay;
  private int maxSendQueue;
  private int maxReceiveQueue;
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow;
  private int halfRcvWindow;
  private int maxRcvWindow;
  private int timeout = 0;
  /**
   * Creates a new Changelog Broker for a particular SynchronizationDomain.
@@ -95,10 +102,11 @@
   * @param maxSendQueue The maximum size of the send queue to use on
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   * @param window The size of the send and receive window to use.
   */
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay )
      int maxSendDelay, int window)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -109,6 +117,9 @@
    this.state = state;
    replayOperations =
      new TreeSet<FakeOperation>(new FakeOperationComparator());
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
  }
  /**
@@ -165,6 +176,7 @@
          InetSocketAddress ServerAddr = new InetSocketAddress(
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
@@ -173,7 +185,7 @@
           */
          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              state);
              halfRcvWindow*2, state);
          session.publish(msg);
@@ -182,7 +194,7 @@
           */
          session.setSoTimeout(1000);
          startMsg = (ChangelogStartMessage) session.receive();
          session.setSoTimeout(0);
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a changelog that has not
@@ -202,6 +214,8 @@
              (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
          {
            changelogServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            break;
          }
@@ -254,6 +268,8 @@
              else
              {
                changelogServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
                for (FakeOperation replayOp : replayOperations)
                {
@@ -306,6 +322,14 @@
           * changes that this server has already processed, start again
           * the loop looking for any changelog server.
           */
          try
          {
            Thread.sleep(500);
          } catch (InterruptedException e)
          {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
@@ -393,13 +417,18 @@
        {
          if (this.connected == false)
            this.reStart(failingSession);
          if (msg instanceof UpdateMessage)
            sendWindow.acquire();
          session.publish(msg);
          done = true;
        } catch (IOException e)
        {
          this.reStart(failingSession);
        }
        catch (InterruptedException e)
        {
          this.reStart(failingSession);
        }
      }
    }
  }
@@ -418,7 +447,25 @@
      ProtocolSession failingSession = session;
      try
      {
        return session.receive();
        SynchronizationMessage msg = session.receive();
        if (msg instanceof WindowMessage)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else
        {
          if (msg instanceof UpdateMessage)
          {
            rcvWindow--;
            if (rcvWindow < halfRcvWindow)
            {
              session.publish(new WindowMessage(halfRcvWindow));
              rcvWindow += halfRcvWindow;
            }
          }
          return msg;
        }
      } catch (Exception e)
      {
        if (e instanceof SocketTimeoutException)
@@ -485,6 +532,7 @@
   */
  public void setSoTimeout(int timeout) throws SocketException
  {
    this.timeout = timeout;
    session.setSoTimeout(timeout);
  }
@@ -532,4 +580,47 @@
  {
    // TODO to be implemented
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
   */
  public int getMaxRcvWindow()
  {
    return maxRcvWindow;
  }
  /**
   * Get the current receive window size.
   *
   * @return The current receive window size.
   */
  public int getCurrentRcvWindow()
  {
    return rcvWindow;
  }
  /**
   * Get the maximum send window size.
   *
   * @return The maximum send window size.
   */
  public int getMaxSendWindow()
  {
    return maxSendWindow;
  }
  /**
   * Get the current send window size.
   *
   * @return The current send window size.
   */
  public int getCurrentSendWindow()
  {
    if (connected)
      return sendWindow.availablePermits();
    else
      return 0;
  }
}
opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
@@ -46,15 +46,19 @@
  private String serverURL;
  private ServerState serverState;
  private int windowSize;
  /**
   * Create a ChangelogStartMessage.
   *
   * @param serverId changelog server id
   * @param serverURL changelog server URL
   * @param baseDn base DN for which the ChangelogStartMessage is created.
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   */
  public ChangelogStartMessage(short serverId, String serverURL, DN baseDn,
                               int windowSize,
                               ServerState serverState)
  {
    this.serverId = serverId;
@@ -63,6 +67,7 @@
      this.baseDn = baseDn.toNormalizedString();
    else
      this.baseDn = null;
    this.windowSize = windowSize;
    this.serverState = serverState;
  }
@@ -76,7 +81,7 @@
  public ChangelogStartMessage(byte[] in) throws DataFormatException
  {
    /* The ChangelogStartMessage is encoded in the form :
     * <baseDn><ServerId><ServerUrl><ServerState>
     * <baseDn><ServerId><ServerUrl><windowsize><ServerState>
     */
    try
    {
@@ -108,6 +113,13 @@
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
      * read the ServerState
      */
      serverState = new ServerState(in, pos, in.length-1);
@@ -179,16 +191,18 @@
  public byte[] getBytes()
  {
    /* The ChangelogStartMessage is stored in the form :
     * <operation type><basedn><serverid><serverURL><serverState>
     * <operation type><basedn><serverid><serverURL><windowsize><serverState>
     */
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
      byteServerUrl.length + 1 + byteServerState.length + 1;
          byteServerUrl.length + 1 + byteWindowSize.length + 1 +
          byteServerState.length + 1;
      byte[] resultByteArray = new byte[length];
@@ -205,6 +219,9 @@
      /* put the ServerURL */
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      /* put the window size */
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      /* put the ServerState */
      pos = addByteArray(byteServerState, resultByteArray, pos);
@@ -215,4 +232,14 @@
      return null;
    }
  }
  /**
   * get the window size for the server that created this message.
   *
   * @return The window size for the server that created this message.
   */
  public int getWindowSize()
  {
    return windowSize;
  }
}
opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
@@ -446,7 +446,7 @@
    // shutdown the Changelog Service if necessary
    if (changelog != null)
      Changelog.shutdown();
      changelog.shutdown();
  }
  /**
opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
@@ -52,6 +52,7 @@
  private int maxSendQueue;
  private int maxReceiveDelay;
  private int maxSendDelay;
  private int windowSize;
  private ServerState serverState = null;
  /**
@@ -64,11 +65,13 @@
   * @param maxReceiveQueue The max receive Queue for this server.
   * @param maxSendDelay The max Send Delay from this server.
   * @param maxSendQueue The max send Queue from this server.
   * @param windowSize   The window size used by this server.
   * @param serverState  The state of this server.
   */
  public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                            int maxReceiveQueue, int maxSendDelay,
                            int maxSendQueue, ServerState serverState)
                            int maxSendQueue, int windowSize,
                            ServerState serverState)
  {
    this.serverId = serverId;
    this.baseDn = baseDn.toString();
@@ -77,6 +80,7 @@
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    this.serverState = serverState;
    this.windowSize = windowSize;
    try
    {
@@ -100,7 +104,7 @@
  {
    /* The ServerStartMessage is encoded in the form :
     * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><ServerState>
     * <maxSendDelay><maxSendQueue><window><ServerState>
     */
    try
    {
@@ -161,6 +165,13 @@
      pos += length +1;
      /*
       * read the windowSize
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
      * read the ServerState
      */
      serverState = new ServerState(in, pos, in.length-1);
@@ -269,7 +280,7 @@
    /*
     * ServerStartMessage contains.
     * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><ServerState>
     * <maxSendDelay><maxSendQueue><windowsize><ServerState>
     */
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -283,6 +294,8 @@
                     String.valueOf(maxSendDelay).getBytes("UTF-8");
      byte[] byteMaxSendQueue =
                     String.valueOf(maxSendQueue).getBytes("UTF-8");
      byte[] byteWindowSize =
                     String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
@@ -291,6 +304,7 @@
                   byteMaxRecvQueue.length + 1 +
                   byteMaxSendDelay.length + 1 +
                   byteMaxSendQueue.length + 1 +
                   byteWindowSize.length + 1 +
                   byteServerState.length + 1;
      byte[] resultByteArray = new byte[length];
@@ -313,6 +327,8 @@
      pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
@@ -322,4 +338,14 @@
      return null;
    }
  }
  /**
   * Get the window size for the ldap server that created the message.
   *
   * @return The window size for the ldap server that created the message.
   */
  public int getWindowSize()
  {
    return windowSize;
  }
}
opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -138,6 +138,7 @@
  static String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay";
  static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
  static String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
  static String WINDOW_SIZE = "ds-cfg-window-size";
  private static final StringConfigAttribute changelogStub =
    new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
@@ -275,6 +276,20 @@
      configAttributes.add(maxSendDelayAttr);
    }
    Integer window;
    IntegerConfigAttribute windowStub =
      new IntegerConfigAttribute(WINDOW_SIZE, "window size",
                                 false, false, false, true, 0, false, 0);
    IntegerConfigAttribute windowAttr =
      (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub);
    if (windowAttr == null)
      window = 100;  // Attribute is not present : use the default value
    else
    {
      window = windowAttr.activeIntValue();
      configAttributes.add(windowAttr);
    }
    configDn = configEntry.getDN();
    DirectoryServer.registerConfigurableComponent(this);
@@ -292,7 +307,7 @@
    try
    {
      broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
          maxReceiveDelay, maxSendQueue, maxSendDelay);
          maxReceiveDelay, maxSendQueue, maxSendDelay, window);
      synchronized (broker)
      {
        broker.start(changelogServers);
@@ -882,10 +897,7 @@
   */
  public int getPendingUpdatesCount()
  {
    synchronized (pendingChanges)
    {
      return pendingChanges.size();
    }
    return pendingChanges.size();
  }
  /**
@@ -1662,4 +1674,44 @@
    }
    return true;
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
   */
  public int getMaxRcvWindow()
  {
    return broker.getMaxRcvWindow();
  }
  /**
   * Get the current receive window size.
   *
   * @return The current receive window size.
   */
  public int getCurrentRcvWindow()
  {
    return broker.getCurrentRcvWindow();
  }
  /**
   * Get the maximum send window size.
   *
   * @return The maximum send window size.
   */
  public int getMaxSendWindow()
  {
    return broker.getMaxSendWindow();
  }
  /**
   * Get the current send window size.
   *
   * @return The current send window size.
   */
  public int getCurrentSendWindow()
  {
    return broker.getCurrentSendWindow();
  }
}
opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
@@ -46,6 +46,7 @@
  static final byte MSG_TYPE_ACK = 5;
  static final byte MSG_TYPE_SERVER_START = 6;
  static final byte MSG_TYPE_CHANGELOG_START = 7;
  static final byte MSG_TYPE_WINDOW = 8;
  /**
   * Do the processing necessary when the message is received.
@@ -106,6 +107,9 @@
      case MSG_TYPE_CHANGELOG_START:
        msg = new ChangelogStartMessage(buffer);
      break;
      case MSG_TYPE_WINDOW:
        msg = new WindowMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
@@ -98,64 +98,37 @@
    attributes.add(attr);
    /* get number of received updates */
    final String ATTR_UPDATE_RECVD = "received-updates";
    AttributeType type =
                    DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_RECVD);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type,
                                  String.valueOf(domain.getNumRcvdUpdates())));
    attr = new Attribute(type, "received-updates", values);
    attributes.add(attr);
    addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates());
    /* get number of updates sent */
    final String ATTR_UPDATE_SENT = "sent-updates";
    type =  DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_SENT);
    values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type,
                                  String.valueOf(domain.getNumSentUpdates())));
    attr = new Attribute(type, "sent-updates", values);
    attributes.add(attr);
    addMonitorData(attributes, "sent-updates", domain.getNumSentUpdates());
    /* get number of changes in the pending list */
    final String ATTR_UPDATE_PENDING = "pending-updates";
    type =  DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_PENDING);
    values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type,
                              String.valueOf(domain.getPendingUpdatesCount())));
    attr = new Attribute(type, "pending-updates", values);
    attributes.add(attr);
    addMonitorData(attributes, "pending-updates",
                   domain.getPendingUpdatesCount());
    /* get number of changes replayed */
    final String ATTR_REPLAYED_UPDATE = "replayed-updates";
    type =  DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE);
    values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type,
                              String.valueOf(domain.getNumProcessedUpdates())));
    attr = new Attribute(type, ATTR_REPLAYED_UPDATE, values);
    attributes.add(attr);
    addMonitorData(attributes, "replayed-updates",
                   domain.getNumProcessedUpdates());
    /* get number of changes successfully */
    final String ATTR_REPLAYED_UPDATE_OK = "replayed-updates-ok";
    type =  DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE_OK);
    values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type,
                          String.valueOf(domain.getNumReplayedPostOpCalled())));
    attr = new Attribute(type, ATTR_REPLAYED_UPDATE_OK, values);
    attributes.add(attr);
    addMonitorData(attributes, "replayed-updates-ok",
                   domain.getNumReplayedPostOpCalled());
    /* get debugCount */
    final String DEBUG_COUNT = "debug-count";
    type =  DirectoryServer.getDefaultAttributeType(DEBUG_COUNT);
    values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type,
                          String.valueOf(domain.getDebugCount())));
    attr = new Attribute(type, DEBUG_COUNT, values);
    attributes.add(attr);
    /* get window information */
    addMonitorData(attributes, "max-rcv-window", domain.getMaxRcvWindow());
    addMonitorData(attributes, "current-rcv-window",
                               domain.getCurrentRcvWindow());
    addMonitorData(attributes, "max-send-window",
                               domain.getMaxSendWindow());
    addMonitorData(attributes, "current-send-window",
                               domain.getCurrentSendWindow());
    /* get the Server State */
    final String ATTR_SERVER_STATE = "server-state";
    type =  DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
    values = new LinkedHashSet<AttributeValue>();
    AttributeType type =
      DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    for (String str : domain.getServerState().toStringSet())
    {
      values.add(new AttributeValue(type,str));
@@ -168,6 +141,27 @@
  }
  /**
   * Add an attribute with an integer value to the list of monitoring
   * attributes.
   *
   * @param attributes the list of monitoring attributes
   * @param name the name of the attribute to add.
   * @param value The integer value of he attribute to add.
   */
  private void addMonitorData(ArrayList<Attribute> attributes,
       String name, int value)
  {
    Attribute attr;
    AttributeType type;
    LinkedHashSet<AttributeValue> values;
    type =  DirectoryServer.getDefaultAttributeType(name);
    values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(type, String.valueOf(value)));
    attr = new Attribute(type, name, values);
    attributes.add(attr);
  }
  /**
   * Retrieves the length of time in milliseconds that should elapse between
   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
   * return value indicates that the <CODE>updateMonitorData()</CODE> method
opends/src/server/org/opends/server/synchronization/WindowMessage.java
New file
@@ -0,0 +1,141 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP server when they first connect.
 * to a changelog server to let them know who they are and what is their state
 * (their RUV)
 */
public class WindowMessage extends SynchronizationMessage implements
    Serializable
{
  private static final long serialVersionUID = 8442267608764026867L;
  private final int numAck;
  /**
   * Create a new WindowMessage.
   *
   * @param numAck The number of acknowledged messages.
   *               The window will be increase by this number.
   */
  public WindowMessage(int numAck)
  {
    this.numAck = numAck;
  }
  /**
   * Creates a new WindowMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the
   *           WindowMessage.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMessage.
   */
  public WindowMessage(byte[] in) throws DataFormatException
  {
    /* The WindowMessage is encoded in the form :
     * <numAck>
     */
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_WINDOW)
        throw new DataFormatException("input is not a valid Window Message");
      int pos = 1;
      /*
       * read the number of acks contained in this message.
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      String numAckStr = new String(in, pos, length, "UTF-8");
      pos += length +1;
      numAck = Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    /*
     * WindowMessage contains.
     * <numAck>
     */
    try {
      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
      int length = 1 + byteNumAck.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_WINDOW;
      int pos = 1;
      pos = addByteArray(byteNumAck, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
  /**
   * Get the number of message acknowledged by the Window Message.
   *
   * @return the number of message acknowledged by the Window Message.
   */
  public int getNumAck()
  {
    return numAck;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public UpdateMessage processReceive(SynchronizationDomain domain)
  {
    return null;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -70,6 +70,14 @@
       "org.opends.server.BuildRoot";
  /**
   * The name of the system property that specifies the ldap port.
   * Set this prtoperty when running the server if you want to use a given
   * port number, otherwise a port is choosed randomly at test startup time.
   */
  public static final String PROPERTY_LDAP_PORT =
       "org.opends.server.LdapPort";
  /**
   * The string representation of the DN that will be used as the base entry for
   * the test backend.  This must not be changed, as there are a number of test
   * cases that depend on this specific value of "o=test".
@@ -205,8 +213,17 @@
    ServerSocket serverJmxSocket   = null;
    ServerSocket serverLdapsSocket = null;
    serverLdapSocket = bindFreePort();
    serverLdapPort = serverLdapSocket.getLocalPort();
    String ldapPort = System.getProperty(PROPERTY_LDAP_PORT);
    if (ldapPort == null)
    {
      serverLdapSocket = bindFreePort();
      serverLdapPort = serverLdapSocket.getLocalPort();
    }
    else
    {
      serverLdapPort = Integer.valueOf(ldapPort);
      serverLdapSocket = bindPort(serverLdapPort);
    }
    serverJmxSocket = bindFreePort();
    serverJmxPort = serverJmxSocket.getLocalPort();
@@ -263,6 +280,23 @@
  }
  /**
   * Binds to the given socket port on the local host.
   * @return the bounded Server socket.
   *
   * @throws IOException in case of underlying exception.
   * @throws SocketException in case of underlying exception.
   */
  private static ServerSocket bindPort(int port)
          throws IOException, SocketException
  {
    ServerSocket serverLdapSocket;
    serverLdapSocket = new ServerSocket();
    serverLdapSocket.setReuseAddress(true);
    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port));
    return serverLdapSocket;
  }
  /**
   * Find and binds to a free server socket port on the local host.
   * @return the bounded Server socket.
   *
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
New file
@@ -0,0 +1,494 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.*;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchScope;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test the contructors, encoders and decoders of the synchronization AckMsg,
 * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
 */
public class ProtocolWindowTest
{
  private static final int WINDOW_SIZE = 10;
  private static final String SYNCHRONIZATION_STRESS_TEST =
    "Synchronization Stress Test";
  /**
   * The internal connection used for operation
   */
  private InternalClientConnection connection;
  /**
   * Created entries that need to be deleted for cleanup
   */
  private ArrayList<Entry> entryList = new ArrayList<Entry>();
  /**
   * The Synchronization config manager entry
   */
  private String synchroStringDN;
  /**
   * The synchronization plugin entry
   */
  private String synchroPluginStringDN;
  private Entry synchroPluginEntry;
  /**
   * The Server synchro entry
   */
  private String synchroServerStringDN;
  private Entry synchroServerEntry;
  /**
   * The Change log entry
   */
  private String changeLogStringDN;
  private Entry changeLogEntry;
  /**
   * A "person" entry
   */
  private Entry personEntry;
  /**
   * schema check flag
   */
  private boolean schemaCheck;
  // WORKAROUND FOR BUG #639 - BEGIN -
  /**
   *
   */
  MultimasterSynchronization mms;
  // WORKAROUND FOR BUG #639 - END -
  /**
   * Test the window mechanism by :
   *  - creating a Changelog service client using the ChangelogBroker class.
   *  - set a small window size.
   *  - perform more than the window size operations.
   *  - check that the Changelog has not sent more than window size operations.
   *  - receive all messages from the ChangelogBroker, check that
   *    the client receives the correct number of operations.
   */
  @Test(enabled=true, groups="slow")
  public void saturateAndRestart() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    cleanEntries();
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 13);
    try {
      /* Test that changelog monitor and synchro plugin monitor informations
       * publish the correct window size.
       * This allows both the check the monitoring code and to test that
       * configuration is working.
       */
      Thread.sleep(1500);
      assertTrue(checkWindows(WINDOW_SIZE));
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate();
      AddOperation addOp = new AddOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, tmp.getDN(),
          tmp.getObjectClasses(), tmp.getUserAttributes(),
          tmp.getOperationalAttributes());
      addOp.run();
      entryList.add(personEntry);
      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
        "The Add Entry operation failed");
      // Check if the client has received the msg
      SynchronizationMessage msg = broker.receive();
      assertTrue(msg instanceof AddMsg,
        "The received synchronization message is not an ADD msg");
      AddMsg addMsg =  (AddMsg) msg;
      Operation receivedOp = addMsg.createOperation(connection);
      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
        "The received synchronization message is not an ADD msg");
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD synchronization message is not for the excepted DN");
      // send twice the window modify operations
      int count = WINDOW_SIZE * 2;
      processModify(count);
      // let some time to the message to reach the changelog client
      Thread.sleep(500);
      // check that the changelog only sent WINDOW_SIZE messages
      assertTrue(searchUpdateSent());
      int rcvCount=0;
      try
      {
        while (true)
        {
          broker.receive();
          rcvCount++;
        }
      }
      catch (SocketTimeoutException e)
      {}
      /*
       * check that we received all updates
       */
      assertEquals(rcvCount, WINDOW_SIZE*2);
    }
    finally {
      broker.stop();
      DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
    }
  }
  /**
   * Check that the window configuration has been successfull
   * by reading the monitoring information and checking
   * that we do have 2 entries with the configured max-rcv-window.
   */
  private boolean checkWindows(int windowSize) throws LDAPException
  {
    InternalSearchOperation op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(max-rcv-window=" + windowSize + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 3);
  }
  /**
   * Search that the changelog has stopped sending changes after
   * having reach the limit of the window size.
   * Do this by checking the monitoring information.
   */
  private boolean searchUpdateSent() throws Exception
  {
    InternalSearchOperation op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 1);
  }
  /**
   * Set up the environment for performing the tests in this Class.
   * synchronization
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  public void setUp() throws Exception
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = new InternalClientConnection();
    // Create backend top level entries
    String[] topEntries = new String[2];
    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: domain\n";
    topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
    Entry entry;
    for (int i = 0; i < topEntries.length; i++)
    {
      entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
      AddOperation addOp = new AddOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
              .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
          entry.getUserAttributes(), entry.getOperationalAttributes());
      addOp.setInternalOperation(true);
      addOp.run();
      entryList.add(entry);
    }
    // top level synchro provider
    synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Multimaster Synchro plugin
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
        + synchroStringDN;
    String synchroPluginLdif = "dn: "
        + synchroPluginStringDN
        + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider\n"
        + "ds-cfg-synchronization-provider-enabled: true\n"
        + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
    synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
    // Change log
    changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
        + "ds-cfg-changelog-server-id: 1\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    // suffix synchronized
    synchroServerStringDN = "cn=example, " + synchroPluginStringDN;
    String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider-config\n"
        + "cn: example\n"
        + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
        + "ds-cfg-changelog-server: localhost:8989\n"
        + "ds-cfg-directory-server-id: 1\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: person\n"
        + "objectClass: organizationalPerson\n"
        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
        + "homePhone: 951-245-7634\n"
        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
        + "mobile: 027-085-0537\n"
        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
        + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
        + "street: 17984 Thirteenth Street\n"
        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
        + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
        + "userPassword: password\n" + "initials: AA\n";
    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
    configureSynchronization();
  }
  /**
   * Clean up the environment. return null;
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @AfterClass
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    // WORKAROUND FOR BUG #639 - BEGIN -
    DirectoryServer.deregisterSynchronizationProvider(mms);
    mms.finalizeSynchronizationProvider();
    // WORKAROUND FOR BUG #639 - END -
    cleanEntries();
  }
  /**
   * suppress all the entries created by the tests in this class
   */
  private void cleanEntries()
  {
    DeleteOperation op;
    // Delete entries
    Entry entries[] = entryList.toArray(new Entry[0]);
    for (int i = entries.length - 1; i != 0; i--)
    {
      try
      {
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            entries[i].getDN());
        op.run();
      } catch (Exception e)
      {
      }
    }
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  /**
   * Open a changelog session to the local Changelog server.
   *
   */
  private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
          throws Exception, SocketException
  {
    ServerState state = new ServerState(baseDn);
    state.loadState();
    ChangelogBroker broker =
      new ChangelogBroker(state, baseDn, serverId, 0, 0, 0, 0, WINDOW_SIZE);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:8989");
    broker.start(servers);
    broker.setSoTimeout(5000);
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    try
    {
      while (true)
      {
        broker.receive();
      }
    }
    catch (Exception e)
    { }
    return broker;
  }
  /**
   * Configure the Synchronization for this test.
   */
  private void configureSynchronization() throws Exception
  {
    //
    // Add the Multimaster synchronization plugin
    DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
    entryList.add(synchroPluginEntry);
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster synchronization plugin");
    // WORKAROUND FOR BUG #639 - BEGIN -
    DN dn = DN.decode(synchroPluginStringDN);
    ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
    mms = new MultimasterSynchronization();
    try
    {
      mms.initializeSynchronizationProvider(mmsConfigEntry);
    }
    catch (ConfigException e)
    {
      assertTrue(false,
          "Unable to initialize the Multimaster synchronization plugin");
    }
    DirectoryServer.registerSynchronizationProvider(mms);
    // WORKAROUND FOR BUG #639 - END -
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry);
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the syncrhonized server");
    entryList.add(synchroServerEntry);
  }
  private void processModify(int count)
  {
    while (count>0)
    {
      count--;
      // must generate the mods for every operation because they are modified
      // by processModify.
      List<Modification> mods = generatemods("telephonenumber", "01 02 45");
      ModifyOperation modOp =
        connection.processModify(personEntry.getDN(), mods);
      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -53,6 +54,8 @@
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
@@ -139,96 +142,100 @@
  @Test(enabled=true, groups="slow")
  public void fromServertoBroker() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting Synchronization StressTest : fromServertoBroker" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    final int TOTAL_MESSAGES = 1000;
    cleanEntries();
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
    DirectoryServer.registerMonitorProvider(this);
    try {
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    try
    {
      while (true)
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        broker.receive();
        while (true)
        {
          broker.receive();
        }
      }
     }
    catch (Exception e)
    { }
    /*
     * Test that operations done on this server are sent to the
     * changelog server and forwarded to our changelog broker session.
     */
      catch (Exception e)
      { }
      /*
       * Test that operations done on this server are sent to the
       * changelog server and forwarded to our changelog broker session.
       */
    // Create an Entry (add operation) that will be later used in the test.
    Entry tmp = personEntry.duplicate();
    AddOperation addOp = new AddOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, tmp.getDN(),
        tmp.getObjectClasses(), tmp.getUserAttributes(),
        tmp.getOperationalAttributes());
    addOp.run();
    entryList.add(personEntry);
    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
      "The Add Entry operation failed");
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate();
      AddOperation addOp = new AddOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, tmp.getDN(),
          tmp.getObjectClasses(), tmp.getUserAttributes(),
          tmp.getOperationalAttributes());
      addOp.run();
      entryList.add(personEntry);
      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
        "The Add Entry operation failed");
    // Check if the client has received the msg
    SynchronizationMessage msg = broker.receive();
    assertTrue(msg instanceof AddMsg,
      "The received synchronization message is not an ADD msg");
    AddMsg addMsg =  (AddMsg) msg;
      // Check if the client has received the msg
      SynchronizationMessage msg = broker.receive();
      assertTrue(msg instanceof AddMsg,
        "The received synchronization message is not an ADD msg");
      AddMsg addMsg =  (AddMsg) msg;
    Operation receivedOp = addMsg.createOperation(connection);
    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
      "The received synchronization message is not an ADD msg");
      Operation receivedOp = addMsg.createOperation(connection);
      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
        "The received synchronization message is not an ADD msg");
    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
      "The received ADD synchronization message is not for the excepted DN");
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD synchronization message is not for the excepted DN");
    reader = new BrokerReader(broker);
    reader.start();
      reader = new BrokerReader(broker);
      reader.start();
    long startTime = TimeThread.getTime();
    int count = TOTAL_MESSAGES;
      long startTime = TimeThread.getTime();
      int count = TOTAL_MESSAGES;
    // Create a number of writer thread that will loop modifying the entry
    List<Thread> writerThreadList = new LinkedList<Thread>();
    for (int n = 0; n < 1; n++)
    {
      BrokerWriter writer = new BrokerWriter(count);
      writerThreadList.add(writer);
    }
    for (Thread thread : writerThreadList)
    {
      thread.start();
    }
    // wait for all the threads to finish.
    for (Thread thread : writerThreadList)
    {
      thread.join();
    }
      // Create a number of writer thread that will loop modifying the entry
      List<Thread> writerThreadList = new LinkedList<Thread>();
      for (int n = 0; n < 1; n++)
      {
        BrokerWriter writer = new BrokerWriter(count);
        writerThreadList.add(writer);
      }
      for (Thread thread : writerThreadList)
      {
        thread.start();
      }
      // wait for all the threads to finish.
      for (Thread thread : writerThreadList)
      {
        thread.join();
      }
    long afterSendTime = TimeThread.getTime();
      long afterSendTime = TimeThread.getTime();
    int rcvCount = reader.getCount();
    long afterReceiveTime = TimeThread.getTime();
      int rcvCount = reader.getCount();
      long afterReceiveTime = TimeThread.getTime();
    if (rcvCount != TOTAL_MESSAGES)
    {
      fail("some messages were lost : expected : " +TOTAL_MESSAGES +
           " received : " + rcvCount);
    }
      if (rcvCount != TOTAL_MESSAGES)
      {
        fail("some messages were lost : expected : " +TOTAL_MESSAGES +
            " received : " + rcvCount);
      }
    }
    finally {
    DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
    broker.stop();
      DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
      broker.stop();
    }
  }
@@ -393,7 +400,7 @@
    ServerState state = new ServerState(baseDn);
    state.loadState();
    ChangelogBroker broker = new ChangelogBroker(state, baseDn,
                                                 serverId, 0, 0, 0, 0);
                                                 serverId, 0, 0, 0, 0, 100);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:8989");
    broker.start(servers);
@@ -441,7 +448,7 @@
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the syncrhonized server");
        "Unable to add the synchronized server");
    entryList.add(synchroServerEntry);
  }
@@ -553,7 +560,9 @@
      {
        while (true)
        {
          broker.receive();
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
          count ++;
        }
      } catch (Exception e) {
@@ -577,7 +586,7 @@
          return count;
        try
        {
          this.wait();
          this.wait(60);
          return count;
        } catch (InterruptedException e)
        {
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
@@ -438,6 +438,73 @@
    // Check that retrieved CN is OK
    msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes());
  }
  @DataProvider(name="serverStart")
  public Object [][] createServerStartMessageTestData() throws Exception
  {
    DN baseDN = DN.decode("dc=example, dc=com");
    ServerState state = new ServerState(baseDN);
    return new Object [][] { {(short)1, baseDN, 100, state} };
  }
  /**
   * Test that ServerStartMessage encoding and decoding works
   * by checking that : msg == new ServerStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="serverStart")
  public void ServerStartMessageTest(short serverId, DN baseDN, int window,
         ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
        window, window, window, window, window, state);
    ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
  }
  @DataProvider(name="changelogStart")
  public Object [][] createChangelogStartMessageTestData() throws Exception
  {
    DN baseDN = DN.decode("dc=example, dc=com");
    ServerState state = new ServerState(baseDN);
    return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} };
  }
  /**
   * Test that changelogStartMessage encoding and decoding works
   * by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="changelogStart")
  public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
         String url, ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
        url, baseDN, window, state);
    ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
  }
  /**
   * Test that WindowMessageTest encoding and decoding works
   * by checking that : msg == new WindowMessageTest(msg.getBytes()).
   */
  @Test()
  public void WindowMessageTest() throws Exception
  {
    WindowMessage msg = new WindowMessage(123);
    WindowMessage newMsg = new WindowMessage(msg.getBytes());
    assertEquals(msg.getNumAck(), newMsg.getNumAck());
  }
  /**
   * Test PendingChange
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.*;
import java.net.SocketException;
@@ -325,6 +326,10 @@
  @Test(enabled=true)
  public void namingConflicts() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : namingConflicts" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
@@ -635,193 +640,198 @@
  @Test(enabled=true, dataProvider="assured")
  public void updateOperations(boolean assured) throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : updateOperations " + assured , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    cleanEntries();
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 27);
    try {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    // broker.setSoTimeout(100);
    try
    {
      while (true)
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        broker.receive();
        while (true)
        {
          broker.receive();
        }
      }
     }
    catch (Exception e)
    {
     // broker.setSoTimeout(1000);
    }
    /*
     * Test that operations done on this server are sent to the
     * changelog server and forwarded to our changelog broker session.
     */
      catch (Exception e)
      {}
      /*
       * Test that operations done on this server are sent to the
       * changelog server and forwarded to our changelog broker session.
       */
    // Create an Entry (add operation)
    Entry tmp = personEntry.duplicate();
    AddOperation addOp = new AddOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, tmp.getDN(),
        tmp.getObjectClasses(), tmp.getUserAttributes(),
        tmp.getOperationalAttributes());
    addOp.run();
    entryList.add(personEntry);
    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
      // Create an Entry (add operation)
      Entry tmp = personEntry.duplicate();
      AddOperation addOp = new AddOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, tmp.getDN(),
          tmp.getObjectClasses(), tmp.getUserAttributes(),
          tmp.getOperationalAttributes());
      addOp.run();
      entryList.add(personEntry);
      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
      "The Add Entry operation failed");
    // Check if the client has received the msg
    SynchronizationMessage msg = broker.receive();
    assertTrue(msg instanceof AddMsg,
      // Check if the client has received the msg
      SynchronizationMessage msg = broker.receive();
      assertTrue(msg instanceof AddMsg,
      "The received synchronization message is not an ADD msg");
    AddMsg addMsg =  (AddMsg) msg;
      AddMsg addMsg =  (AddMsg) msg;
    Operation receivedOp = addMsg.createOperation(connection);
    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
      Operation receivedOp = addMsg.createOperation(connection);
      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
      "The received synchronization message is not an ADD msg");
    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
      "The received ADD synchronization message is not for the excepted DN");
    // Modify the entry
    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
      // Modify the entry
      List<Modification> mods = generatemods("telephonenumber", "01 02 45");
    ModifyOperation modOp = new ModifyOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, personEntry.getDN(), mods);
    modOp.setInternalOperation(true);
    modOp.run();
      ModifyOperation modOp = new ModifyOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, personEntry.getDN(), mods);
      modOp.setInternalOperation(true);
      modOp.run();
    // See if the client has received the msg
    msg = broker.receive();
    assertTrue(msg instanceof ModifyMsg,
        "The received synchronization message is not a MODIFY msg");
    ModifyMsg modMsg = (ModifyMsg) msg;
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof ModifyMsg,
      "The received synchronization message is not a MODIFY msg");
      ModifyMsg modMsg = (ModifyMsg) msg;
    receivedOp = modMsg.createOperation(connection);
    assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
    "The received MODIFY synchronization message is not for the excepted DN");
      receivedOp = modMsg.createOperation(connection);
      assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
      "The received MODIFY synchronization message is not for the excepted DN");
    // Modify the entry DN
    DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
    ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, personEntry.getDN(), RDN
            .decode("uid=new person"), true, DN
            .decode("ou=People,dc=example,dc=com"));
    modDNOp.run();
    assertNotNull(DirectoryServer.getEntry(newDN),
        "The MOD_DN operation didn't create the new person entry");
    assertNull(DirectoryServer.getEntry(personEntry.getDN()),
        "The MOD_DN operation didn't delete the old person entry");
    entryList.add(DirectoryServer.getEntry(newDN));
      // Modify the entry DN
      DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
      ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, personEntry.getDN(), RDN
          .decode("uid=new person"), true, DN
          .decode("ou=People,dc=example,dc=com"));
      modDNOp.run();
      assertNotNull(DirectoryServer.getEntry(newDN),
      "The MOD_DN operation didn't create the new person entry");
      assertNull(DirectoryServer.getEntry(personEntry.getDN()),
      "The MOD_DN operation didn't delete the old person entry");
      entryList.add(DirectoryServer.getEntry(newDN));
    // See if the client has received the msg
    msg = broker.receive();
    assertTrue(msg instanceof ModifyDNMsg,
        "The received synchronization message is not a MODIFY DN msg");
    ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
    receivedOp = moddnMsg.createOperation(connection);
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof ModifyDNMsg,
      "The received synchronization message is not a MODIFY DN msg");
      ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
      receivedOp = moddnMsg.createOperation(connection);
    assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
        "The received MODIFY_DN message is not for the excepted DN");
      assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
      "The received MODIFY_DN message is not for the excepted DN");
    // Delete the entry
    Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
    DeleteOperation delOp = new DeleteOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, DN
            .decode("uid= new person,ou=People,dc=example,dc=com"));
    delOp.run();
    assertNull(DirectoryServer.getEntry(newDN),
        "Unable to delete the new person Entry");
    entryList.remove(newPersonEntry);
      // Delete the entry
      Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
      DeleteOperation delOp = new DeleteOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, DN
          .decode("uid= new person,ou=People,dc=example,dc=com"));
      delOp.run();
      assertNull(DirectoryServer.getEntry(newDN),
      "Unable to delete the new person Entry");
      entryList.remove(newPersonEntry);
    // See if the client has received the msg
    msg = broker.receive();
    assertTrue(msg instanceof DeleteMsg,
        "The received synchronization message is not a MODIFY DN msg");
    DeleteMsg delMsg = (DeleteMsg) msg;
    receivedOp = delMsg.createOperation(connection);
    assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
        .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
        "The received DELETE message is not for the excepted DN");
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof DeleteMsg,
      "The received synchronization message is not a MODIFY DN msg");
      DeleteMsg delMsg = (DeleteMsg) msg;
      receivedOp = delMsg.createOperation(connection);
      assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
          .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
      "The received DELETE message is not for the excepted DN");
    /*
     * Now check that when we send message to the Changelog server
     * and that they are received and correctly replayed by the server.
     *
     * Start by testing the Add message reception
     */
    addMsg = new AddMsg(gen.NewChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID, baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    if (assured)
      addMsg.setAssured();
    broker.publish(addMsg);
      /*
       * Now check that when we send message to the Changelog server
       * and that they are received and correctly replayed by the server.
       *
       * Start by testing the Add message reception
       */
      addMsg = new AddMsg(gen.NewChangeNumber(),
          personWithUUIDEntry.getDN().toString(),
          user1entryUUID, baseUUID,
          personWithUUIDEntry.getObjectClassAttribute(),
          personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
      if (assured)
        addMsg.setAssured();
      broker.publish(addMsg);
    /*
     * Check that the entry has been created in the local DS.
     */
    Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
    assertNotNull(resultEntry,
        "The send ADD synchronization message was not applied");
    entryList.add(resultEntry);
      /*
       * Check that the entry has been created in the local DS.
       */
      Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
      assertNotNull(resultEntry,
      "The send ADD synchronization message was not applied");
      entryList.add(resultEntry);
    /*
     * Test the reception of Modify Msg
     */
    modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
                           mods, user1entryUUID);
    if (assured)
      modMsg.setAssured();
    broker.publish(modMsg);
      /*
       * Test the reception of Modify Msg
       */
      modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
          mods, user1entryUUID);
      if (assured)
        modMsg.setAssured();
      broker.publish(modMsg);
    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
                           "telephonenumber", "01 02 45", 1000);
      boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
          "telephonenumber", "01 02 45", 1000);
    if (found == false)
     fail("The modification has not been correctly replayed.");
      if (found == false)
        fail("The modification has not been correctly replayed.");
    /*
     * Test the Reception of Modify Dn Msg
     */
    moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
                           gen.NewChangeNumber(),
                           user1entryUUID, null,
                           true, null, "uid= new person");
    if (assured)
      moddnMsg.setAssured();
    broker.publish(moddnMsg);
      /*
       * Test the Reception of Modify Dn Msg
       */
      moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
          gen.NewChangeNumber(),
          user1entryUUID, null,
          true, null, "uid= new person");
      if (assured)
        moddnMsg.setAssured();
      broker.publish(moddnMsg);
    resultEntry = getEntry(
        DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true);
      resultEntry = getEntry(
          DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true);
    assertNotNull(resultEntry,
        "The modify DN synchronization message was not applied");
      assertNotNull(resultEntry,
      "The modify DN synchronization message was not applied");
    /*
     * Test the Reception of Delete Msg
     */
    delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
                           gen.NewChangeNumber(), user1entryUUID);
    if (assured)
      delMsg.setAssured();
    broker.publish(delMsg);
    resultEntry = getEntry(
      /*
       * Test the Reception of Delete Msg
       */
      delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
          gen.NewChangeNumber(), user1entryUUID);
      if (assured)
        delMsg.setAssured();
      broker.publish(delMsg);
      resultEntry = getEntry(
          DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, false);
    assertNull(resultEntry,
        "The DELETE synchronization message was not replayed");
    broker.stop();
      assertNull(resultEntry,
      "The DELETE synchronization message was not replayed");
    }
    finally
    {
      broker.stop();
    }
  }
  /**
@@ -850,7 +860,7 @@
    ServerState state = new ServerState(baseDn);
    state.loadState();
    ChangelogBroker broker = new ChangelogBroker(state, baseDn,
                                                 serverId, 0, 0, 0, 0);
                                                 serverId, 0, 0, 0, 0, 100);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:8989");
    broker.start(servers);
@@ -996,6 +1006,10 @@
  @Test(enabled=true)
  public void deleteNoSuchObject() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : deleteNoSuchObject" , 1);
    DN dn = DN.decode("cn=No Such Object,ou=People,dc=example,dc=com");
    Operation op =
         new DeleteOperation(connection,
@@ -1014,12 +1028,17 @@
  @Test(enabled=true)
  public void infiniteReplayLoop() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : infiniteReplayLoop" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
    Thread.sleep(2000);
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 11);
    try
    {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
      // Create a test entry.
      String personLdif = "dn: uid=user.2,ou=People,dc=example,dc=com\n"