mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
17.42.2007 3eb214ba900a7f3e4550f6a047db26e7f40b82a8
Issue 605 Total Update over protocol
11 files added
18 files modified
5185 ■■■■■ changed files
opends/resource/schema/02-config.ldif 33 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/config/ConfigConstants.java 56 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/TaskMessages.java 22 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java 366 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java 63 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/common/LogMessages.java 65 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java 9 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java 122 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java 97 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java 1194 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java 122 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java 142 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java 188 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java 158 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java 212 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java 103 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java 27 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java 214 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTask.java 267 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java 1488 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 50 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java 112 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -1045,6 +1045,10 @@
  NAME 'ds-cfg-heartbeat-interval'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306
  NAME 'ds-cfg-changelog-db-directory'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307
  NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' )
@@ -1125,6 +1129,22 @@
  NAME 'ds-cfg-virtual-attribute-conflict-behavior'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332
  NAME 'ds-task-initialize-domain-dn'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333
  NAME 'ds-task-initialize-replica-server-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334
  NAME 'ds-task-unprocessed-entry-count'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335
  NAME 'ds-task-processed-entry-count'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.336
  NAME 'ds-cfg-dictionary-file' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' )
@@ -1481,7 +1501,8 @@
  'ds-cfg-synchronization-changelog-server-config' SUP top
  STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
  ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
  SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
  X-ORIGIN 'OpenDS Directory Server' )
@@ -1594,6 +1615,16 @@
  ds-cfg-virtual-attribute-conflict-behavior )
  MAY ( ds-cfg-virtual-attribute-base-dn $ ds-cfg-virtual-attribute-group-dn $
  ds-cfg-virtual-attribute-filter ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.92
  NAME 'ds-task-initialize-from-remote-replica' SUP ds-task
  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.93
  NAME 'ds-task-initialize-remote-replica' SUP ds-task
  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.95
  NAME 'ds-cfg-dictionary-password-validator' SUP ds-cfg-password-validator
  STRUCTURAL MUST ( ds-cfg-dictionary-file $ ds-cfg-case-sensitive-validation $
opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,6 +3665,62 @@
       NAME_PREFIX_TASK + "import-is-encrypted";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * initialize task definition.
   */
  public static final String OC_INITIALIZE_TASK =
    NAME_PREFIX_TASK + "initialize-from-remote-replica";
  /**
   * The name of the attribute in an initialize task definition that specifies
   * the base dn related to the synchonization domain to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN =
       NAME_PREFIX_TASK + "initialize-domain-dn";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the source in terms of source server from which to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_SOURCE =
       NAME_PREFIX_TASK + "initialize-replica-server-id";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * initialize target task definition.
   */
  public static final String OC_INITIALIZE_TARGET_TASK =
    NAME_PREFIX_TASK + "initialize-remote-replica";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the base dn related to the synchonization domain to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN =
       NAME_PREFIX_TASK + "initialize-domain-dn";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE =
       NAME_PREFIX_TASK + "initialize-replica-server-id";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_LEFT =
       NAME_PREFIX_TASK + "unprocessed-entry-count";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_DONE =
       NAME_PREFIX_TASK + "processed-entry-count";
  /**
   * The name of the objectclass that will be used for a Directory Server
opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,8 +211,6 @@
  public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17;
  /**
   * The message ID for the message that will be used an attempt is made to
   * invoke the index rebuild task by a user that does not have the required
@@ -221,6 +219,20 @@
  public static final int MSGID_TASK_INDEXREBUILD_INSUFFICIENT_PRIVILEGES =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18;
  /**
   * The message ID for the message that will be used when an invalid domain
   * base DN is provided as argument to the initialize target task.
   */
  public static final int  MSGID_TASK_INITIALIZE_TARGET_INVALID_DN =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19;
  /**
   * The message ID for the message that will be used when an invalid domain
   * base DN is provided as argument to the initialize task.
   */
  public static final int  MSGID_TASK_INITIALIZE_INVALID_DN =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 20;
  /**
@@ -292,6 +304,12 @@
    registerMessage(MSGID_TASK_INDEXREBUILD_INSUFFICIENT_PRIVILEGES,
                    "You do not have sufficient privileges to initiate an " +
                    "index rebuild.");
    registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN,
                    "Invalid DN provided with the Initialize Target task.");
    registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN,
                    "Invalid DN provided with the Initialize task.");
  }
}
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
  static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import com.sleepycat.je.DatabaseException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import com.sleepycat.je.DatabaseException;
/**
 * This class define an in-memory cache that will be used to store
 * the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
    }
  }
  /**
   * Send back an ack to the server that sent the change.
   * Retrieves the destination handlers for a routable message.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param msg The message to route.
   * @param senderHandler The handler of the server that published this message.
   * @return The list of destination handlers.
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  {
    short serverId = changeNumber.getServerId();
    sendAck(changeNumber, isLDAPserver, serverId);
  }
  /**
   *
   * Send back an ack to a server that sent the change.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param serverId     The identifier of the server from which we
   *                     received the change..
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
                      short serverId)
  {
    ServerHandler handler;
    if (isLDAPserver)
      handler = connectedServers.get(serverId);
    else
      handler = changelogServers.get(serverId);
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    // TODO : check for null handler and log error
    try
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      handler.sendAck(changeNumber);
    } catch (IOException e)
    {
      /*
       * An error happened trying the send back an ack to this server.
       * Log an error and close the connection to this server.
       */
      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
      String message = getMessage(msgID, this.toString())
                                  + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      handler.shutdown();
      // TODO Import from the "closest server" to be implemented
    }
  }
  /**
   * Shutdown this ChangelogCache.
   */
  public void shutdown()
  {
    // Close session with other changelogs
    for (ServerHandler serverHandler : changelogServers.values())
    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
    {
      serverHandler.shutdown();
    }
    // Close session with other LDAP servers
    for (ServerHandler serverHandler : connectedServers.values())
    {
      serverHandler.shutdown();
    }
    // Shutdown the dbHandlers
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      if (!senderHandler.isChangelogServer())
      {
        dbHandler.shutdown();
        // Send to all changelogServers
        for (ServerHandler destinationHandler : changelogServers.values())
        {
          servers.add(destinationHandler);
        }
      }
      sourceDbHandlers.clear();
      // Send to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
          continue;
        servers.add(destinationHandler);
      }
    }
    else
    {
      // Destination is one server
      ServerHandler destinationHandler =
        connectedServers.get(msg.getDestination());
      if (destinationHandler != null)
      {
        servers.add(destinationHandler);
      }
      else
      {
        // the targeted server is NOT connected
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(changelogServers.values());
        }
      }
    }
    return servers;
  }
  /**
   * Returns the ServerState describing the last change from this replica.
   * Process an InitializeRequestMessage.
   *
   * @return The ServerState describing the last change from this replica.
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
   * the message.
   */
  public ServerState getDbServerState()
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    ServerState serverState = new ServerState();
    for (DbHandler db : sourceDbHandlers.values())
    {
      serverState.update(db.getLastChange());
    }
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    return "ChangelogCache " + baseDn;
  }
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    if (servers.isEmpty())
    {
      handler.checkWindow();
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
        }
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        }
      }
      return;
    }
    for (ServerHandler handler : connectedServers.values())
    for (ServerHandler targetHandler : servers)
    {
      handler.checkWindow();
      try
      {
        targetHandler.send(msg);
      }
      catch(IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
      }
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    /**
     * Send back an ack to the server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
    {
      if (!handler.restartAfterSaturation(sourceHandler))
      short serverId = changeNumber.getServerId();
      sendAck(changeNumber, isLDAPserver, serverId);
    }
    /**
     *
     * Send back an ack to a server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     * @param serverId     The identifier of the server from which we
     *                     received the change..
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
        short serverId)
    {
      ServerHandler handler;
      if (isLDAPserver)
        handler = connectedServers.get(serverId);
      else
        handler = changelogServers.get(serverId);
      // TODO : check for null handler and log error
      try
      {
        handler.sendAck(changeNumber);
      } catch (IOException e)
      {
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        handler.shutdown();
      }
    }
    /**
     * Shutdown this ChangelogCache.
     */
    public void shutdown()
    {
      // Close session with other changelogs
      for (ServerHandler serverHandler : changelogServers.values())
      {
        serverHandler.shutdown();
      }
      // Close session with other LDAP servers
      for (ServerHandler serverHandler : connectedServers.values())
      {
        serverHandler.shutdown();
      }
      // Shutdown the dbHandlers
      synchronized (sourceDbHandlers)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        {
          dbHandler.shutdown();
        }
        sourceDbHandlers.clear();
      }
    }
    /**
     * Returns the ServerState describing the last change from this replica.
     *
     * @return The ServerState describing the last change from this replica.
     */
    public ServerState getDbServerState()
    {
      ServerState serverState = new ServerState();
      for (DbHandler db : sourceDbHandlers.values())
      {
        serverState.update(db.getLastChange());
      }
      return serverState;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      return "ChangelogCache " + baseDn;
    }
    /**
     * Check if some server Handler should be removed from flow control state.
     * @throws IOException If an error happened.
     */
    public void checkAllSaturation() throws IOException
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.checkWindow();
      }
      for (ServerHandler handler : connectedServers.values())
      {
        handler.checkWindow();
      }
    }
    /**
     * Check if a server that was in flow control can now restart
     * sending updates.
     * @param sourceHandler The server that must be checked.
     * @return true if the server can restart sending changes.
     *         false if the server can't restart sending changes.
     */
    public boolean restartAfterSaturation(ServerHandler sourceHandler)
    {
      for (ServerHandler handler : changelogServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
      }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
      for (ServerHandler handler : connectedServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
      }
      return true;
    }
    return true;
  }
}
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -44,6 +46,18 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -51,17 +65,6 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +111,7 @@
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private short changelogId;
  /**
   * The time in milliseconds between heartbeats from the synchronization
@@ -155,6 +159,7 @@
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  {
    this.changelogId = changelogId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
@@ -1263,4 +1268,40 @@
  {
    return heartbeatInterval;
  }
  /**
   * Processes a routable message.
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
    changelogCache.process(msg, this);
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
    session.publish(msg);
  }
}
opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,6 +34,11 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
@@ -116,6 +121,33 @@
          WindowMessage windowMsg = (WindowMessage) msg;
          handler.updateWindow(windowMsg);
        }
        else if (msg instanceof InitializeRequestMessage)
        {
          InitializeRequestMessage initializeMsg =
            (InitializeRequestMessage) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof InitializeTargetMessage)
        {
          InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof EntryMessage)
        {
          EntryMessage entryMsg = (EntryMessage) msg;
          handler.process(entryMsg);
        }
        else if (msg instanceof DoneMessage)
        {
          DoneMessage doneMsg = (DoneMessage) msg;
          handler.process(doneMsg);
        }
        else if (msg instanceof ErrorMessage)
        {
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
      }
    } catch (IOException e)
    {
opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,13 +317,61 @@
    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
  /**
   * The message id for thedescription of the attribute used to configure
   * The message id for the description of the attribute used to configure
   * the purge delay of the Changelog Servers.
   */
  public static final int MSGID_PURGE_DELAY_ATTR =
    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
  /**
   * The message id for the error raised when export/import
   * is rejected due to an export/import already in progress.
   */
  public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44;
  /**
   * The message id for the error raised when import
   * is rejected due to an invalid source of data imported.
   */
  public static final int MSGID_INVALID_IMPORT_SOURCE =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45;
  /**
   * The message id for the error raised when export
   * is rejected due to an invalid target to export datas.
   */
  public static final int MSGID_INVALID_EXPORT_TARGET =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46;
  /**
   * The message id for the error raised when import/export message
   * cannot be routed to an up-and-running target in the domain.
   */
  public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47;
  /**
   * The message ID for the message that will be used when no domain
   * can be found matching the provided domain base DN.
   */
  public static final int  MSGID_NO_MATCHING_DOMAIN =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48;
  /**
   * The message ID for the message that will be used when no domain
   * can be found matching the provided domain base DN.
   */
  public static final int  MSGID_MULTIPLE_MATCHING_DOMAIN
       = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49;
  /**
   * The message ID for the message that will be used when the domain
   * belongs to a provider class that does not allow the export.
   */
  public static final int  MSGID_INVALID_PROVIDER =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50;
  /**
   * Register the messages from this class in the core server.
@@ -449,5 +497,20 @@
        " restored because changelog servers would not be able to refresh" +
        " LDAP servers with older versions of the data. A zero value" +
        " can be used to specify an infinite delay (or never purge).");
    MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
        "The current request is rejected due to an import or an export" +
        " already in progress for the same data.");
    MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE,
        "Invalid source for the import.");
    MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET,
        "Invalid target for the export.");
    MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
        "No reachable peer in the domain.");
    MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN,
        "No domain matches the base DN provided.");
    MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN,
        "Multiple domains match the base DN provided.");
    MessageHandler.registerMessage(MSGID_INVALID_PROVIDER,
        "The provider class does not allow the operation requested.");
  }
}
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -345,6 +347,10 @@
          {
            if (session != null)
            {
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  "Broker : connect closing session" , 1);
              session.close();
              session = null;
            }
@@ -498,6 +504,7 @@
      try
      {
        SynchronizationMessage msg = session.receive();
        if (msg instanceof WindowMessage)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
@@ -544,6 +551,11 @@
    connected = false;
    try
    {
      if (debugEnabled())
      {
        debugInfo("ChangelogBroker Stop Closing session");
      }
      session.close();
    } catch (IOException e)
    {}
@@ -682,4 +694,12 @@
  {
    return numLostConnections;
  }
  private void log(String message)
  {
    int    msgID   = MSGID_UNKNOWN_TYPE;
    logError(ErrorLogCategory.SYNCHRONIZATION,
           ErrorLogSeverity.SEVERE_ERROR,
           message, msgID);
  }
}
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,12 +27,14 @@
package org.opends.server.synchronization.plugin;
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import java.io.IOException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
/**
 * This class implements a thread to monitor heartbeat messages from the
 * synchronization server.  Each broker runs one of these threads.
@@ -103,6 +105,9 @@
        long lastReceiveTime = session.getLastReceiveTime();
        if (now > lastReceiveTime + 2 * heartbeatInterval)
        {
          debugInfo("Heartbeat monitor is closing the broker session " +
          "because it could not detect a heartbeat.");
          // Heartbeat is well overdue so the server is assumed to be dead.
          if (debugEnabled())
          {
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@
   *             Can be null is the request has no associated operation.
   * @return     The Synchronization domain for this DN.
   */
  private static SynchronizationDomain findDomain(DN dn, Operation op)
  public static SynchronizationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special synchronization code on Operation that are
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,7 +233,8 @@
    {
      int msgID = MSGID_ERROR_UPDATING_RUV;
      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
          op.toString(), op.getErrorMessage(), baseDn.toString());
          op.toString(), op.getErrorMessage(), baseDn.toString(),
          Thread.currentThread().getStackTrace());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java
New file
@@ -0,0 +1,122 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.plugin;
import java.io.IOException;
import java.io.InputStream;
/**
 * This class creates an input stream that can be used to read entries generated
 * by SynchroLDIF as if they were being read from another source like a file.
 */
public class SynchroLDIFInputStream
extends InputStream
{
  // Indicates whether this input stream has been closed.
  private boolean closed;
  // The synchronization domain associated to this import.
  SynchronizationDomain domain;
  /**
   * Creates a new SynchroLDIFInputStream that will import entries
   * for a synchronzation domain.
   *
   * @param domain The synchronization domain
   */
  public SynchroLDIFInputStream(SynchronizationDomain domain)
  {
    this.domain = domain;
    closed       = false;
  }
  /**
   * Closes this input stream so that no more data may be read from it.
   */
  public void close()
  {
    closed      = true;
  }
  /**
   * Reads data from this input stream.
   *
   * @param  b    The array into which the data should be read.
   * @param  off  The position in the array at which point the data read may be
   *              placed.
   * @param  len  The maximum number of bytes that may be read into the
   *              provided array.
   *
   * @return  The number of bytes read from the input stream into the provided
   *          array, or -1 if the end of the stream has been reached.
   *
   * @throws  IOException  If a problem has occurred while generating data for
   *                       use by this input stream.
   */
  public int read(byte[] b, int off, int len)
  throws IOException
  {
    if (closed)
      return -1;
    byte[] bytes = domain.receiveEntryBytes();
    if (bytes==null)
    {
      closed = true;
      return -1;
    }
    int l = bytes.length;
    for (int i =0; i<l; i++)
    {
      b[off+i] = bytes[i];
    }
    return l;
  }
  /**
   * Reads a single byte of data from this input stream.
   *
   * @return  The byte read from the input stream, or -1 if the end of the
   *          stream has been reached.
   *
   * @throws  IOException  If a problem has occurred while generating data for
   *                       use by this input stream.
   */
  public int read()
         throws IOException
  {
    // This method is not supposed to be called to make an LDIF import
    // for synchronization.
    throw new IOException("Not implemented");
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java
New file
@@ -0,0 +1,97 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.plugin;
import java.io.IOException;
import java.io.OutputStream;
/**
 * This class creates an output stream that can be used to export entries
 * to a synchonization domain.
 */
public class SynchroLDIFOutputStream
       extends OutputStream
{
  SynchronizationDomain domain;
  String entryBuffer = "";
  /**
   * Creates a new SynchroLDIFOutputStream related to a synchronization
   * domain.
   *
   * @param domain The synchronization domain
   */
  public SynchroLDIFOutputStream(SynchronizationDomain domain)
  {
    this.domain = domain;
  }
  /**
   * {@inheritDoc}
   */
  public void write(int i) throws IOException
  {
    throw new IOException("Invalid call");
  }
  /**
   * {@inheritDoc}
   */
  public void write(byte b[], int off, int len) throws IOException
  {
    int endOfEntryIndex;
    int startOfEntryIndex = off;
    int bytesToRead = len;
    while (true)
    {
      // if we have the bytes for an entry, let's make an entry and send it
      String ebytes = new String(b,startOfEntryIndex,bytesToRead);
      endOfEntryIndex = ebytes.indexOf("\n\n");
      if ( endOfEntryIndex >= 0 )
      {
        endOfEntryIndex += 2;
        entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
        // Send the entry
        domain.sendEntryLines(entryBuffer);
        startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
        entryBuffer = "";
        bytesToRead -= endOfEntryIndex;
        if (bytesToRead==0)
          break;
      }
      else
      {
        entryBuffer = new String(b, startOfEntryIndex, len);
        break;
      }
    }
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,43 +26,54 @@
 */
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_ABBR;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_FULL;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.ConfigMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
@@ -73,23 +84,35 @@
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -137,8 +160,96 @@
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  short serverId;
  private short serverId;
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   */
  private class IEContext
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    SynchroLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMessage.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry left to be processed
    long entryLeftCount = 0;
    // The exception raised when any
    DirectoryException exception = null;
    /**
     * Initializes the counters of the task with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initTaskCounters(long count)
    {
      entryCount = count;
      entryLeftCount = count;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
      }
    }
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     */
    public void updateTaskCounters()
    {
      entryLeftCount--;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
        }
      }
    }
    /**
     * Update the state of the task.
     */
    protected TaskState updateTaskCompletionState()
    {
      if (exception == null)
        return TaskState.COMPLETED_SUCCESSFULLY;
      else
        return TaskState.STOPPED_BY_ERROR;
    }
  }
  // The context related to an import or export being processed
  // Null when none is being processed.
  private IEContext ieContext = null;
  // The backend informations necessary to make an import or export.
  private Backend backend;
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
@@ -160,6 +271,7 @@
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -205,8 +317,6 @@
    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
  }
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   *
@@ -217,6 +327,7 @@
  public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
  {
    super("Synchronization flush");
    /*
     * read the centralized changelog server configuration
     * this is a multivalued attribute
@@ -397,6 +508,10 @@
        if (!receiveStatus)
          broker.suspendReceive();
      }
      // Retrieves the related backend and its config entry
      retrievesBackendInfos(baseDN);
    } catch (Exception e)
    {
     /* TODO should mark that changelog service is
@@ -803,9 +918,9 @@
  }
  /**
   * Receive an update message from the changelog.
   * Receives an update message from the changelog.
   * also responsible for updating the list of pending changes
   * @return the received message
   * @return the received message - null if none
   */
  public UpdateMessage receive()
  {
@@ -823,7 +938,7 @@
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
@@ -834,6 +949,56 @@
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
            }
            catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
            }
            catch(DirectoryException de)
            {
              // Return an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  changelog did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
@@ -876,7 +1041,9 @@
  public void receiveAck(AckMessage ack)
  {
    UpdateMessage update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    ChangeNumber changeNumber;
    changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    {
@@ -1105,7 +1272,7 @@
        synchronized (this)
        {
          this.wait(1000);
          if (!disabled )
          if (!disabled && !stateSavingDisabled )
          {
            // save the RUV
            state.save();
@@ -1151,6 +1318,8 @@
      this.notify();
    }
    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
    // stop the ChangelogBroker
    broker.stop();
  }
@@ -1857,6 +2026,7 @@
    return broker.getNumLostConnections();
  }
  /**
   * Check if the domain solve conflicts.
   *
@@ -1933,6 +2103,988 @@
    // Nothing is needed at the moment
  }
  /*
   * Total Update >>
   */
  /**
   * Receives bytes related to an entry in the context of an import to
   * initialize the domain (called by SynchronizationDomainLDIFInputStream).
   *
   * @return The bytes. Null when the Done or Err message has been received
   */
  public byte[] receiveEntryBytes()
  {
    SynchronizationMessage msg;
    while (true)
    {
      try
      {
        msg = broker.receive();
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        log("receiveEntryBytes: received " + msg);
        if (msg instanceof EntryMessage)
        {
          // FIXME
          EntryMessage entryMsg = (EntryMessage)msg;
          byte[] entryBytes = entryMsg.getEntryBytes().clone();
          ieContext.updateTaskCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          return null;
        }
        else if (msg instanceof ErrorMessage)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ieContext.exception = new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails() , errorMsg.getMsgID());
          return null;
        }
        else
        {
          // Other messages received during an import are trashed
        }
      }
      catch(Exception e)
      {
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            "received an unexpected message type" , 1, e);
      }
      return null;
    }
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * @param errorMsg The error message received.
   */
  protected void abandonImportExport(ErrorMessage errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
          errorMsg.getDetails() , errorMsg.getMsgID());
      if (ieContext.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
        ieContext = null;
      }
    }
  }
  /**
   * Clears all the entries from the JE backend determined by the
   * be id passed into the method.
   *
   * @param  createBaseEntry  Indicate whether to automatically create the base
   *                          entry and add it to the backend.
   * @param beID  The be id to clear.
   * @param dn   The suffix of the backend to create if the the createBaseEntry
   *             boolean is true.
   * @throws Exception  If an unexpected problem occurs.
   */
  public static void clearJEBackend(boolean createBaseEntry, String beID,
      String dn) throws Exception
  {
    BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
    DN[] baseDNs = backend.getBaseDNs();
    // FIXME Should getConfigEntry be part of TaskUtils ?
    ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
    // FIXME Should setBackendEnabled be part of TaskUtils ?
    TaskUtils.setBackendEnabled(configEntry, false);
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
      {
        throw new RuntimeException(failureReason.toString());
      }
      try
      {
        backend.clearBackend(configEntry, baseDNs);
      }
      finally
      {
        LockFileManager.releaseLock(lockFile, failureReason);
      }
    }
    finally
    {
      TaskUtils.setBackendEnabled(configEntry, true);
    }
    if (createBaseEntry)
    {
      DN baseDN = DN.decode(dn);
      Entry e = createEntry(baseDN);
      backend = (BackendImpl)DirectoryServer.getBackend(beID);
      backend.addEntry(e, null);
    }
  }
  /**
   * Log debug message.
   * @param message The message to log.
   */
  private void log(String message)
  {
    if (debugEnabled())
    {
      debugInfo("DebugInfo" + message);
      int    msgID   = MSGID_UNKNOWN_TYPE;
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationDomain/ " + message, msgID);
    }
  }
  /**
   * Export the entries.
   * @throws DirectoryException when an error occured
   */
  protected void exportBackend() throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            String.valueOf(failureReason));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message + " " + stackTraceToSingleLineString(e), msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    //  Launch the export.
    try
    {
      DN[] baseDNs = {this.baseDN};
      backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, de.getErrorMessage());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, stackTraceToSingleLineString(e));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    finally
    {
      //  Clean up after the export by closing the export config.
      exportConfig.close();
      //  Release the shared lock on the backend.
      try
      {
        String lockFile = LockFileManager.getBackendLockFileName(backend);
        StringBuilder failureReason = new StringBuilder();
        if (! LockFileManager.releaseLock(lockFile, failureReason))
        {
          int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
          String message = getMessage(msgID, backend.getBackendID(),
              String.valueOf(failureReason));
          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
              message, msgID);
          throw new DirectoryException(
              ResultCode.OTHER, message, msgID, null);
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            stackTraceToSingleLineString(e));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
  }
  /**
   * Retrieves the backend object related to the domain and the backend's
   * config entry. They will be used for import and export.
   * TODO This should be in a shared package rather than here.
   *
   * @param baseDN The baseDN to retrieve the backend
   * @throws DirectoryException when an error occired
   */
  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
  {
    ArrayList<Backend>     backendList = new ArrayList<Backend>();
    ArrayList<ConfigEntry> entryList   = new ArrayList<ConfigEntry>();
    ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
    Backend backend = null;
    ConfigEntry backendConfigEntry = null;
    List<DN> branches = new ArrayList<DN>(0);
    // Retrieves the backend related to this domain
    Backend domainBackend = DirectoryServer.getBackend(baseDN);
    if (domainBackend == null)
    {
      int    msgID   = MSGID_CANNOT_DECODE_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    // Retrieves its config entry and its DNs
    int code = getBackends(backendList, entryList, dnList);
    if (code != 0)
    {
      int    msgID   = MSGID_CANNOT_DECODE_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    int numBackends = backendList.size();
    for (int i=0; i < numBackends; i++)
    {
      Backend b = backendList.get(i);
      if (domainBackend.getBackendID() != b.getBackendID())
      {
        continue;
      }
      if (backend == null)
      {
        backend = domainBackend;
        backendConfigEntry = entryList.get(i).duplicate();
        branches = dnList.get(i);
      }
      else
      {
        int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
        String message = getMessage(msgID, domainBackend.getBackendID());
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    if (backend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
      String message = getMessage(msgID, domainBackend.getBackendID());
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    else if (! backend.supportsLDIFExport())
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_IMPORT;
      String message = getMessage(msgID, 0); // FIXME
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    this.backend = backend;
    this.backendConfigEntry = backendConfigEntry;
    this.branches = branches;
  }
  /**
   * Sends lDIFEntry entry lines to the export target currently set.
   *
   * @param lDIFEntry The lines for the LDIF entry.
   * @throws IOException when an error occured.
   */
  public void sendEntryLines(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // we just let down the export.
    if (ieContext.exception != null)
    {
      IOException ioe = new IOException(ieContext.exception.getMessage());
      ieContext = null;
      throw ioe;
    }
    // new entry then send the current one
    EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    ieContext.updateTaskCounters();
  }
  /**
   * Retrieves information about the backends defined in the Directory Server
   * configuration.
   *
   * @param  backendList  A list into which instantiated (but not initialized)
   *                      backend instances will be placed.
   * @param  entryList    A list into which the config entries associated with
   *                      the backends will be placed.
   * @param  dnList       A list into which the set of base DNs for each backend
   *                      will be placed.
   */
  private static int getBackends(ArrayList<Backend> backendList,
                                 ArrayList<ConfigEntry> entryList,
                                 ArrayList<List<DN>> dnList)
  throws DirectoryException
  {
    //  Get the base entry for all backend configuration.
    DN backendBaseDN = null;
    try
    {
      backendBaseDN = DN.decode(DN_BACKEND_BASE);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_CANNOT_DECODE_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_CANNOT_DECODE_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    ConfigEntry baseEntry = null;
    try
    {
      baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
    }
    catch (ConfigException ce)
    {
      int    msgID   = MSGID_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    //  Iterate through the immediate children, attempting to parse them as
    //  backends.
    for (ConfigEntry configEntry : baseEntry.getChildren().values())
    {
      // Get the backend ID attribute from the entry.  If there isn't one, then
      // skip the entry.
      String backendID = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
        StringConfigAttribute idStub =
          new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
              true, false, true);
        StringConfigAttribute idAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
        if (idAttr == null)
        {
          continue;
        }
        else
        {
          backendID = idAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      //    Get the backend class name attribute from the entry.  If there isn't
      //    one, then just skip the entry.
      String backendClassName = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
        StringConfigAttribute classStub =
          new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
              true, false, false);
        StringConfigAttribute classAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
        if (classAttr == null)
        {
          continue;
        }
        else
        {
          backendClassName = classAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Class backendClass = null;
      try
      {
        backendClass = Class.forName(backendClassName);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_CANNOT_LOAD_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Backend backend = null;
      try
      {
        backend = (Backend) backendClass.newInstance();
        backend.setBackendID(backendID);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_CANNOT_INSTANTIATE_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      // Get the base DN attribute from the entry.  If there isn't one, then
      // just skip this entry.
      List<DN> baseDNs = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
        DNConfigAttribute baseDNStub =
          new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
              true, true, true);
        DNConfigAttribute baseDNAttr =
          (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
        if (baseDNAttr == null)
        {
          msgID = MSGID_NO_BASES_FOR_BACKEND;
          String message = getMessage(msgID,
              String.valueOf(configEntry.getDN()));
          throw new DirectoryException(
              DirectoryServer.getServerErrorResultCode(), message,msgID, null);
        }
        else
        {
          baseDNs = baseDNAttr.activeValues();
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_CANNOT_DETERMINE_BASES_FOR_BACKEND;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      backendList.add(backend);
      entryList.add(configEntry);
      dnList.add(baseDNs);
    }
    return 0;
      }
  /**
   * Initializes this domain from another source server.
   *
   * @param source The source from which to initialize
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initialize(short source, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.initializeTask = initTask;
    InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
        baseDN, serverId, source);
    // Publish Init request msg
    broker.publish(initializeMsg);
    // .. we expect to receive entries or err after that
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representaing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    short  source = 0;
    Throwable cause = null;
    try
    {
      source = Integer.decode(sourceString).shortValue();
      if (source >= -1)
      {
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        log("Source decoded for import:" + source);
        return source;
      }
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param targetString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeTarget(String targetString)
  throws DirectoryException
  {
    short  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMessage.ALL_SERVERS;
    }
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString).shortValue();
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
      }
      return target;
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  private synchronized void acquireIEContext()
  throws DirectoryException
  {
    if (ieContext != null)
    {
      // Rejects 2 simultaneous exports
      int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
      String message = getMessage(msgID);
      throw new DirectoryException(ResultCode.OTHER,
          message, msgID);
    }
    ieContext = new IEContext();
  }
  private synchronized void releaseIEContext()
  {
    ieContext = null;
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument.
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, Task initTask)
  throws DirectoryException
  {
    initializeTarget(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization has been
   * initiated by another server than this one.
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.exportTarget = target;
    ieContext.initializeTask = initTask;
    ieContext.initTaskCounters(backend.getEntryCount());
    // Send start message
    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
        baseDN, serverId, ieContext.exportTarget, requestorID,
        ieContext.entryLeftCount);
    log("SD : publishes " + initializeMessage +
        " for #entries=" + ieContext.entryCount);
    broker.publish(initializeMessage);
    // make an export and send entries
    exportBackend();
    // Successfull termnation
    DoneMessage doneMsg = new DoneMessage(serverId,
      initializeMessage.getDestination());
    broker.publish(doneMsg);
    if (ieContext != null)
    {
      ieContext.updateTaskCompletionState();
      ieContext = null;
    }
  }
  /**
   * Process backend before import.
   * @param backend The backend.
   * @param backendConfigEntry The config entry of the backend.
   * @throws Exception
   */
  private void preBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws Exception
  {
    // Stop saving state
    stateSavingDisabled = true;
    // Clear the backend
    clearJEBackend(false,backend.getBackendID(),null);
    // FIXME setBackendEnabled should be part of TaskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, false);
    // Acquire an exclusive lock for the backend.
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      throw new DirectoryException(ResultCode.OTHER, message, msgID);
    }
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void importBackend(InitializeTargetMessage initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    try
    {
      log("startImport");
      if (initializeMessage.getRequestorID() == serverId)
      {
        // The import responds to a request we did so the IEContext
        // is already acquired
      }
      else
      {
        acquireIEContext();
      }
      ieContext.importSource = initializeMessage.getsenderID();
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initTaskCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend, this.backendConfigEntry);
      DN[] baseDNs = {baseDN};
      ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
      importConfig =
        new LDIFImportConfig(ieContext.ldifImportInputStream);
      importConfig.setIncludeBranches(this.branches);
      // TODO How to deal with rejected entries during the import
      // importConfig.writeRejectedEntries("rejectedImport",
      // ExistingFileBehavior.OVERWRITE);
      // Process import
      this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
      stateSavingDisabled = false;
      // Re-exchange state with SS
      broker.stop();
      broker.start(changelogServers);
    }
    catch(Exception e)
    {
      throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
          2);// FIXME
    }
    finally
    {
      // Cleanup
      importConfig.close();
      // Re-enable backend
      closeBackendImport(this.backend, this.backendConfigEntry);
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      {
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
      }
      releaseIEContext();
      log("End importBackend");
    }
    // Success
  }
  /**
   * Make post import operations.
   * @param backend The backend implied in the import.
   * @param backendConfigEntry The config entry of the backend.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void closeBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws DirectoryException
  {
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    // Release lock
    if (!LockFileManager.releaseLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      new DirectoryException(ResultCode.OTHER, message, msgID);
    }
    // FIXME setBackendEnabled should be part taskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, true);
  }
  /**
   * Retrieves a synchronization domain based on the baseDN.
   *
   * @param baseDN The baseDN of the domain to retrieve
   * @return The domain retrieved
   * @throws DirectoryException When an error occured.
   */
  public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
  throws DirectoryException
  {
    SynchronizationDomain synchronizationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterSynchronization))
      {
        int msgID = LogMessages.MSGID_INVALID_PROVIDER;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      // From the domainDN retrieves the synchronization domain
      SynchronizationDomain sdomain =
        MultimasterSynchronization.findDomain(baseDN, null);
      if (sdomain == null)
      {
        int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
        String message = getMessage(msgID) + " " + baseDN;
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      if (synchronizationDomain != null)
      {
        // Should never happen
        int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      synchronizationDomain = sdomain;
    }
    return synchronizationDomain;
  }
  /**
   * Returns the backend associated to this domain.
   * @return The associated backend.
   */
  public Backend getBackend()
  {
    return backend;
  }
  /**
   * Returns a boolean indiciating if an import or export is currently
   * processed.
   * @return The status
   */
  public boolean ieRunning()
  {
    return (ieContext != null);
  }
  /*
   * <<Total Update
   */
  /**
   * Push the modifications contain the in given parameter has
   * a modification that would happen on a local server.
opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java
New file
@@ -0,0 +1,122 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is part of the synchronization protocol.
 * This message is sent by a server to one or several other servers after the
 * last entry sent in the context of a total update and signals to the server
 * that receives it that the export is now finished.
 */
public class DoneMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = 5216659571724730361L;
  /**
   * Creates a message.
   *
   * @param sender The sender server of this message.
   * @param destination The server or servers targetted by this message.
   */
  public DoneMessage(short sender, short destination)
  {
    super(sender, destination);
  }
  /**
   * Creates a new message by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the message,
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   */
  public DoneMessage(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_DONE)
        throw new DataFormatException("input is not a valid DoneMessage");
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Short.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_DONE;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java
New file
@@ -0,0 +1,142 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is part of the synchronization protocol.
 * This message is sent by a server to one or several other servers and
 * contain one entry to be sent over the protocol in the context of
 * an import/export over the protocol.
 */
public class EntryMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = 6116955858351992926L;
  // The byte array containing the bytes of the entry transported
  private byte[] entryByteArray;
  /**
   * Creates a new EntryMessage.
   *
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @param entryBytes The bytes of the entry.
   */
  public EntryMessage(short sender, short destination, byte[] entryBytes)
  {
    super(sender, destination);
    this.entryByteArray = entryBytes.clone();
  }
  /**
   * Creates a new EntryMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public EntryMessage(byte[] in) throws DataFormatException
  {
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ENTRY)
        throw new DataFormatException("input is not a valid ServerStart msg");
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderIDString = new String(in, pos, length, "UTF-8");
      this.senderID = Short.valueOf(senderIDString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
      // entry
      length = getNextLength(in, pos);
      this.entryByteArray = new byte[length];
      for (int i=0; i<length; i++)
      {
        entryByteArray[i] = in[pos+i];
      }
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Returns the entry bytes.
   * @return The entry bytes.
   */
  public byte[] getEntryBytes()
  {
    return entryByteArray;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] entryBytes = entryByteArray;
      int length = 1 + senderBytes.length +
                   1 + destinationBytes.length +
                   1 + entryBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_ENTRY;
      int pos = 1;
      pos = addByteArray(senderBytes, resultByteArray, pos);
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      pos = addByteArray(entryBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java
New file
@@ -0,0 +1,188 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is part of the synchronization protocol.
 * This message is sent by a server or a changelog server when an error
 * is detected in the context of a total update.
 */
public class ErrorMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = 2726389860247088266L;
  // Specifies the messageID built form the error that was detected
  private int msgID;
  // Specifies the complementary details about the error that was detected
  private String details = null;
  /**
   * Create a InitializeMessage.
   * @param sender The server ID of the server that send this message.
   * @param destination The destination server or servers of this message.
   * @param msgID The error message ID.
   * @param details The details of the error.
   */
  public ErrorMessage(short sender, short destination, int msgID,
      String details)
  {
    super(sender, destination);
    this.msgID  = msgID;
    this.details = details;
  }
  /**
   * Create a InitializeMessage.
   *
   * @param destination changelog server id
   * @param msgID error message ID
   * @param details details of the error
   */
  public ErrorMessage(short destination, int msgID, String details)
  {
    super((short)-2, destination);
    this.msgID  = msgID;
    this.details = details;
  }
  /**
   * Creates a new InitializeMessage by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the Message
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public ErrorMessage(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ERROR)
        throw new DataFormatException("input is not a valid InitializeMessage");
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Short.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      destination = Short.valueOf(serverIdString);
      pos += length +1;
      // MsgID
      length = getNextLength(in, pos);
      String msgIdString = new String(in, pos, length, "UTF-8");
      msgID = Integer.valueOf(msgIdString);
      pos += length +1;
      // Details
      length = getNextLength(in, pos);
      details = new String(in, pos, length, "UTF-8");
      pos += length +1;
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the base DN from this InitializeMessage.
   *
   * @return the base DN from this InitializeMessage.
   */
  public String getDetails()
  {
    return details;
  }
  /**
   * Get the base DN from this InitializeMessage.
   *
   * @return the base DN from this InitializeMessage.
   */
  public int getMsgID()
  {
    return msgID;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    /* The InitializeMessage is stored in the form :
     * <operation type><basedn><serverid>
     */
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
      byte[] byteDetails = details.getBytes("UTF-8");
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteErrMsgId.length + 1
                     + byteDetails.length + 1;
      byte[] resultByteArray = new byte[length];
      // put the type of the operation
      resultByteArray[0] = MSG_TYPE_ERROR;
      int pos = 1;
      // sender
      pos = addByteArray(byteSender, resultByteArray, pos);
      // destination
      pos = addByteArray(byteDestination, resultByteArray, pos);
      // MsgId
      pos = addByteArray(byteErrMsgId, resultByteArray, pos);
      // details
      pos = addByteArray(byteDetails, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java
New file
@@ -0,0 +1,158 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is part of the synchronization protocol.
 * This message is sent by a server to another server in order to
 * request this other server to do an export to the server sender
 * of this message.
 */
public class InitializeRequestMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = 8303271162942249215L;
  private String baseDn = null;
  /**
   * Creates a InitializeRequestMessage message.
   *
   * @param baseDn The base DN of the synchronization domain.
   * @param destination destination of this message
   * @param senderID serverID of the server that will send this message
   */
  public InitializeRequestMessage(DN baseDn, short senderID, short destination)
  {
    super(senderID, destination);
    this.baseDn = baseDn.toNormalizedString();
  }
  /**
   * Creates a new InitializeRequestMessage by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the Message
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeRequestMessage(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_INITIALIZE_REQUEST)
        throw new DataFormatException(
            "input is not a valid InitializeRequestMessage");
      int pos = 1;
      // baseDn
      int length = getNextLength(in, pos);
      baseDn = new String(in, pos, length, "UTF-8");
      pos += length +1;
      // sender
      length = getNextLength(in, pos);
      String sourceServerIdString = new String(in, pos, length, "UTF-8");
      senderID = Short.valueOf(sourceServerIdString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationServerIdString = new String(in, pos, length, "UTF-8");
      destination = Short.valueOf(destinationServerIdString);
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the base DN from this InitializeRequestMessage.
   *
   * @return the base DN from this InitializeRequestMessage.
   */
  public DN getBaseDn()
  {
    if (baseDn == null)
      return null;
    try
    {
      return DN.decode(baseDn);
    } catch (DirectoryException e)
    {
      return null;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try {
      byte[] baseDNBytes = baseDn.getBytes("UTF-8");
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).
      getBytes("UTF-8");
      int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
        + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      // type of the operation
      resultByteArray[0] = MSG_TYPE_INITIALIZE_REQUEST;
      int pos = 1;
      // baseDN
      pos = addByteArray(baseDNBytes, resultByteArray, pos);
      // sender
      pos = addByteArray(senderBytes, resultByteArray, pos);
      // destination
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java
New file
@@ -0,0 +1,212 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is part of the synchronization protocol.
 * This message is sent by a server to one or several servers as the
 * first message of an export, before sending the entries.
 */
public class InitializeTargetMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = -2122460559739139735L;
  private String baseDN = null;
  // Specifies the number of entries expected to be exported.
  private long entryCount;
  // Specifies the serverID of the server that requested this export
  // to happen. It allows a server that previously sent an
  // InitializeRequestMessage to know that the current message
  // is related to its own request.
  private short requestorID;
  /**
   * Creates a InitializeDestinationMessage.
   *
   * @param baseDN The base DN for which the InitializeMessage is created.
   * @param senderID The serverID of the server that sends this message.
   * @param destination The destination of this message.
   * @param requestorID The server that initiates this export.
   * @param entryCount The count of entries that will be sent.
   */
  public InitializeTargetMessage(DN baseDN, short senderID,
      short destination, short requestorID, long entryCount)
  {
    super(senderID, destination);
    this.requestorID = requestorID;
    this.baseDN = baseDN.toNormalizedString();
    this.entryCount = entryCount;
  }
  /**
   * Creates an InitializeTargetMessage by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the Message
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeTargetMessage(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_INITIALIZE_TARGET)
        throw new DataFormatException(
            "input is not a valid InitializeDestinationMessage");
      int pos = 1;
      // destination
      int length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
      // baseDn
      length = getNextLength(in, pos);
      baseDN = new String(in, pos, length, "UTF-8");
      pos += length +1;
      // sender
      length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Short.valueOf(senderString);
      pos += length +1;
      // requestor
      length = getNextLength(in, pos);
      String requestorString = new String(in, pos, length, "UTF-8");
      requestorID = Short.valueOf(requestorString);
      pos += length +1;
      // entryCount
      length = getNextLength(in, pos);
      String entryCountString = new String(in, pos, length, "UTF-8");
      entryCount = Long.valueOf(entryCountString);
      pos += length +1;
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the number of entries expected to be sent during the export.
   * @return the entry count
   */
  public long getEntryCount()
  {
    return this.entryCount;
  }
  /**
   * Get the serverID of the server that initiated the export.
   * @return the serverID
   */
  public long getRequestorID()
  {
    return this.requestorID;
  }
  /**
   * Get the base DN of the domain.
   *
   * @return the base DN
   */
  public DN getBaseDN()
  {
    if (baseDN == null)
      return null;
    try
    {
      return DN.decode(baseDN);
    } catch (DirectoryException e)
    {
      return null;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteDn = baseDN.getBytes("UTF-8");
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
      byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
      int length = 1 + byteDestination.length + 1
                     + byteDn.length + 1
                     + byteSender.length + 1
                     + byteRequestor.length + 1
                     + byteEntryCount.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_INITIALIZE_TARGET;
      int pos = 1;
      /* put the destination */
      pos = addByteArray(byteDestination, resultByteArray, pos);
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the sender */
      pos = addByteArray(byteSender, resultByteArray, pos);
      /* put the requestorID */
      pos = addByteArray(byteRequestor, resultByteArray, pos);
      /* put the entryCount */
      pos = addByteArray(byteEntryCount, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java
New file
@@ -0,0 +1,103 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.io.Serializable;
/**
 * This is an abstract class of messages of the synchronization protocol
 * for message that needs to contain information about the server that
 * send them and the destination servers to whitch they should be sent.
 */
public abstract class RoutableMessage extends SynchronizationMessage implements
    Serializable
{
  /**
   *  Special values for the server ids fields contained in the routable
   *  messages.
   **/
  /**
   *  Specifies that no server is identified.
   */
  public static final short UNKNOWN_SERVER      = -1;
  /**
   * Specifies all servers in the synchronization domain.
   */
  public static final short ALL_SERVERS         = -2;
  /**
   * Inside a topology of servers in the same domain, it specifies
   * the server that is the "closest" to the sender.
   */
  public static final short THE_CLOSEST_SERVER  = -3;
  /**
   * The destination server or servers of this message.
   */
  protected short destination = UNKNOWN_SERVER;
  /**
   * The serverID of the server that sends this message.
   */
  protected short senderID = UNKNOWN_SERVER;
  /**
   * Creates a routable message.
   * @param senderID changelog server id
   * @param destination changelog server id
   */
  public RoutableMessage(short senderID, short destination)
  {
    this.senderID = senderID;
    this.destination = destination;
  }
  /**
   * Creates a routable message.
   */
  public RoutableMessage()
  {
  }
  /**
   * Get the destination.
   * @return the destination
   */
  public short getDestination()
  {
    return this.destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   * @return the server id
   */
  public short getsenderID()
  {
    return this.senderID;
  }
}
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,11 +112,16 @@
    /* Read the first 8 bytes containing the packet length */
    int length = 0;
    /* Let's start the stop-watch before waiting on read */
    /* for the heartbeat check to be operationnal        */
    lastReceiveTime = System.currentTimeMillis();
    while (length<8)
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      if (read == -1)
      {
        lastReceiveTime=0;
        throw new IOException("no more data");
      }
      else
@@ -135,8 +140,9 @@
      {
        length += input.read(buffer, length, totalLength - length);
      }
      lastReceiveTime = System.currentTimeMillis();
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return SynchronizationMessage.generateMsg(buffer);
    }
    catch (OutOfMemoryError e)
@@ -159,6 +165,10 @@
   */
  public long getLastReceiveTime()
  {
    if (lastReceiveTime==0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,6 +47,13 @@
  static final byte MSG_TYPE_CHANGELOG_START = 7;
  static final byte MSG_TYPE_WINDOW = 8;
  static final byte MSG_TYPE_HEARTBEAT = 9;
  static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
  static final byte MSG_TYPE_INITIALIZE_TARGET = 11;
  static final byte MSG_TYPE_ENTRY = 12;
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
  /**
   * Return the byte[] representation of this message.
@@ -60,6 +67,11 @@
   * MSG_TYPE_CHANGELOG_START
   * MSG_TYPE_WINDOW
   * MSG_TYPE_HEARTBEAT
   * MSG_TYPE_INITIALIZE
   * MSG_TYPE_INITIALIZE_TARGET
   * MSG_TYPE_ENTRY
   * MSG_TYPE_DONE
   * MSG_TYPE_ERROR
   *
   * @return the byte[] representation of this message.
   */
@@ -107,6 +119,21 @@
      case MSG_TYPE_HEARTBEAT:
        msg = new HeartbeatMessage(buffer);
      break;
      case MSG_TYPE_INITIALIZE_REQUEST:
        msg = new InitializeRequestMessage(buffer);
      break;
      case MSG_TYPE_INITIALIZE_TARGET:
        msg = new InitializeTargetMessage(buffer);
      break;
      case MSG_TYPE_ENTRY:
        msg = new EntryMessage(buffer);
      break;
      case MSG_TYPE_DONE:
        msg = new DoneMessage(buffer);
      break;
      case MSG_TYPE_ERROR:
        msg = new ErrorMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
New file
@@ -0,0 +1,214 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.tasks;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.CoreMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.messages.TaskMessages;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.synchronization.plugin.SynchronizationDomain;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
/**
 * This class provides an implementation of a Directory Server task that can
 * be used to import data from an LDIF file into a backend.
 */
public class InitializeTargetTask extends Task
{
  // Config properties
  boolean append                  = false;
  boolean isCompressed            = false;
  boolean isEncrypted             = false;
  boolean skipSchemaValidation    = false;
  String  domainString            = null;
  SynchronizationDomain domain = null;
  short target;
  long total;
  long left;
  /**
   * {@inheritDoc}
   */
  @Override public void initializeTask() throws DirectoryException
  {
    // FIXME -- Do we need any special authorization here?
    Entry taskEntry = getTaskEntry();
    AttributeType typeDomainBase;
    AttributeType typeScope;
    typeDomainBase =
      getAttributeType(ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN, true);
    typeScope =
      getAttributeType(ATTR_TASK_INITIALIZE_TARGET_SCOPE, true);
    List<Attribute> attrList;
    attrList = taskEntry.getAttribute(typeDomainBase);
    domainString = TaskUtils.getSingleValueString(attrList);
    DN domainDN = DN.nullDN();
    try
    {
      domainDN = DN.decode(domainString);
    }
    catch(Exception e)
    {
      int    msgID   = TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN;
      String message = getMessage(msgID) + e.getMessage();
      throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX,
          message, msgID);
    }
    domain=SynchronizationDomain.retrievesSynchronizationDomain(domainDN);
    attrList = taskEntry.getAttribute(typeScope);
    String targetString = TaskUtils.getSingleValueString(attrList);
    target = domain.decodeTarget(targetString);
    createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
    createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
  }
  /**
   * {@inheritDoc}
   */
  protected TaskState runTask()
  {
    if (debugEnabled())
    {
      debugInfo("DebugInfo" + "InitializeTarget Task/runTask ");
    }
    try
    {
      domain.initializeTarget(target, this);
    }
    catch(DirectoryException de)
    {
      logError(ErrorLogCategory.TASK,
          ErrorLogSeverity.SEVERE_ERROR,
          "Initialize Task stopped by error", 1);
      return TaskState.STOPPED_BY_ERROR;
    }
    return TaskState.COMPLETED_SUCCESSFULLY;
  }
  /**
   * Create attribute to store entry counters.
   * @param name The name of the attribute.
   * @param value The value to store for that attribute.
   */
  protected void createCounterAttribute(String name, long value)
  {
    AttributeType type;
    LinkedHashSet<AttributeValue> values =
      new LinkedHashSet<AttributeValue>();
    Entry taskEntry = getTaskEntry();
    try
    {
      type = getAttributeType(name, true);
      values.add(new AttributeValue(type,
          new ASN1OctetString(String.valueOf(value))));
      ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
      attrList.add(new Attribute(type, name,values));
      taskEntry.putAttribute(type, attrList);
    }
    finally
    {
      // taskScheduler.unlockEntry(taskEntryDN, lock);
    }
  }
  /**
   * Set the total number of entries expected to be exported.
   * @param total The total number of entries.
   */
  public void setTotal(long total)
  {
    this.total = total;
    try
    {
      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total);
      updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
    }
    catch(Exception e) {}
  }
  /**
   * Set the total number of entries still to be exported.
   * @param left The total number of entries to be exported.
   */
  public void setLeft(long left)
  {
    this.left = left;
    try
    {
      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
      updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
    }
    catch(Exception e) {}
  }
  /**
   * Update an attribute for this task.
   * @param name The name of the attribute.
   * @param value The value.
   * @throws DirectoryException When an error occurs.
   */
  protected void updateAttribute(String name, long value)
  throws DirectoryException
  {
    Entry taskEntry = getTaskEntry();
    ArrayList<Modification> modifications = new ArrayList<Modification>();
    modifications.add(new Modification(ModificationType.REPLACE,
        new Attribute(name, String.valueOf(value))));
    taskEntry.applyModifications(modifications);
  }
}
opends/src/server/org/opends/server/tasks/InitializeTask.java
New file
@@ -0,0 +1,267 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.tasks;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.CoreMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.messages.TaskMessages;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.synchronization.plugin.SynchronizationDomain;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
/**
 * This class provides an implementation of a Directory Server task that can
 * be used to import data over the synchronization protocol from another
 * server hosting the same synchronization domain.
 */
public class InitializeTask extends Task
{
  boolean isCompressed            = false;
  boolean isEncrypted             = false;
  boolean skipSchemaValidation    = false;
  String  domainString            = null;
  short  source;
  SynchronizationDomain domain = null;
  TaskState initState;
  // The total number of entries expected to be processed when this import
  // will end successfully
  long total = 0;
  // The number of entries still to be processed for this import to be
  // completed
  long left = 0;
  /**
   * {@inheritDoc}
   */
  @Override public void initializeTask() throws DirectoryException
  {
    // FIXME -- Do we need any special authorization here?
    Entry taskEntry = getTaskEntry();
    AttributeType typeDomainBase;
    AttributeType typeSourceScope;
    typeDomainBase =
      getAttributeType(ATTR_TASK_INITIALIZE_DOMAIN_DN, true);
    typeSourceScope =
      getAttributeType(ATTR_TASK_INITIALIZE_SOURCE, true);
    List<Attribute> attrList;
    attrList = taskEntry.getAttribute(typeDomainBase);
    domainString = TaskUtils.getSingleValueString(attrList);
    DN domainDN = DN.nullDN();
    try
    {
      domainDN = DN.decode(domainString);
    }
    catch(Exception e)
    {
      int    msgID   = TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN;
      String message = getMessage(msgID) + e.getMessage();
      throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX,
          message, msgID);
    }
    domain=SynchronizationDomain.retrievesSynchronizationDomain(domainDN);
    attrList = taskEntry.getAttribute(typeSourceScope);
    String sourceString = TaskUtils.getSingleValueString(attrList);
    source = domain.decodeSource(sourceString);
    createAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
    createAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
  }
  /**
   * {@inheritDoc}
   */
  protected TaskState runTask()
  {
    if (debugEnabled())
    {
      debugInfo("InitializeTask is starting domain: %s source:%d",
                domain.getBaseDN(), source);
    }
    initState = getTaskState(); // RUNNING
    try
    {
      // launch the import
      domain.initialize(source, this);
      synchronized(initState)
      {
        // Waiting for the end of the job
        while (initState == TaskState.RUNNING)
        {
          initState.wait(1000);
          updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
          updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
        }
      }
      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
      updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
    }
    catch(InterruptedException ie) {}
    catch(DirectoryException de)
    {
      int msgID   = de.getErrorMessageID();
      String message = getMessage(msgID, de.getErrorMessage());
      logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      initState = TaskState.STOPPED_BY_ERROR;
    }
    if (debugEnabled())
    {
      debugInfo("InitializeTask is ending with state:%d", initState);
    }
    return initState;
  }
  /**
   * Set the state for the current task.
   *
   * @param newState The new state value to set
   * @param de  When the new state is different from COMPLETED_SUCCESSFULLY
   * this is the exception that contains the cause of the failure.
   */
  public void setState(TaskState newState, DirectoryException de)
  {
    try
    {
      if (de != null)
      {
        int msgID   = de.getErrorMessageID();
        String message = getMessage(msgID, de.getErrorMessage());
        logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
      }
      if (debugEnabled())
      {
        logError(ErrorLogCategory.TASK,
            ErrorLogSeverity.SEVERE_ERROR,
            "setState: "+newState, 1);
        debugInfo("InitializeTask/setState: ", newState);
      }
      initState = newState;
      synchronized (initState)
      {
        initState.notify();
      }
    }
    catch(Exception e)
    {}
  }
  /**
   * Create a new attribute the task entry.
   * @param name The name of the attribute
   * @param value The value to store
   */
  protected void createAttribute(String name, long value)
  {
    AttributeType type;
    LinkedHashSet<AttributeValue> values =
      new LinkedHashSet<AttributeValue>();
    Entry taskEntry = getTaskEntry();
    try
    {
      type = getAttributeType(name, true);
      values.add(new AttributeValue(type,
          new ASN1OctetString(String.valueOf(value))));
      ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
      attrList.add(new Attribute(type, name,values));
      taskEntry.putAttribute(type, attrList);
    }
    finally
    {
      // taskScheduler.unlockEntry(taskEntryDN, lock);
    }
  }
  /**
   * Update an attribute for this task.
   * @param name The name of the attribute.
   * @param value The value.
   * @throws DirectoryException When an error occurs.
   */
  protected void updateAttribute(String name, long value)
  throws DirectoryException
  {
    Entry taskEntry = getTaskEntry();
    ArrayList<Modification> modifications = new ArrayList<Modification>();
    modifications.add(new Modification(ModificationType.REPLACE,
        new Attribute(name, String.valueOf(value))));
    taskEntry.applyModifications(modifications);
  }
  /**
   * Set the total number of entries expected to be imported.
   * @param total The total number of entries.
   */
  public void setTotal(long total)
  {
    this.total = total;
  }
  /**
   * Set the total number of entries still to be imported.
   * @param left The total number of entries to be imported.
   */
  public void setLeft(long left)
  {
    this.left = left;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java
New file
@@ -0,0 +1,1488 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_INITIALIZE_DONE;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_INITIALIZE_LEFT;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigEntry;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.messages.TaskMessages;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.synchronization.changelog.Changelog;
import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.SynchronizationDomain;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SocketSession;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
/**
 * Tests contained here:
 *
 * Initialize Test Cases <=> Pull entries
 * ---------------------
 * InitializeImport : Tests the import in the target DS.
 * Creates a task on current DS and makes a broker simulates DS2 sending entries.
 * InitializeExport : Tests the export from the source DS
 * A broker simulates DS2 pulling entries from current DS.
 *
 * Initialize Target Test Cases <=> Push entries
 * ----------------------------
 * InitializeTargetExport : Tests the export from the source DS
 * Creates a task on current DS and makes broker simulates DS2 receiving entries
 * InitializeTargetImport : Test the import in the target DS
 * A broker simulates DS2 receiving entries from current DS.
 *
 * InitializeTargetConfigErrors : Tests configuration errors of the
 * InitializeTarget task
 */
public class InitOnLineTest extends SynchronizationTestCase
{
  private static final int WINDOW_SIZE = 10;
  private static final int CHANGELOG_QUEUE_SIZE = 100;
  private static final String SYNCHRONIZATION_STRESS_TEST =
    "Synchronization Stress Test";
  /**
   * A "person" entry
   */
  protected Entry personEntry;
  protected Entry taskInitFromS2;
  protected Entry taskInitTargetS2;
  protected Entry taskInitTargetAll;
  SocketSession ssSession = null;
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  boolean externalDS = false;
  short server1ID = 1;
  short server2ID = 2;
  short server3ID = 3;
  short changelog1ID = 12;
  short changelog2ID = 13;
  int changelogPort = 8989;
  private DN baseDn;
  ChangelogBroker server2 = null;
  Changelog changelog1 = null;
  Changelog changelog2 = null;
  boolean emptyOldChanges = true;
  SynchronizationDomain sd = null;
  private void log(String s)
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "InitOnLineTests/" + s, 1);
    if (debugEnabled())
    {
      debugInfo(s);
    }
  }
  protected void log(String message, Exception e)
  {
    log(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment for performing the tests in this Class.
   * synchronization
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  public void setUp() throws Exception
  {
    log("Setup: debugEnabled:" + debugEnabled());
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    baseDn = DN.decode("dc=example,dc=com");
    updatedEntries = newLDIFEntries();
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
    connection = InternalClientConnection.getRootConnection();
    // Synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Synchro multi-master
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    String synchroPluginLdif = "dn: "
      + synchroPluginStringDN
      + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-synchronization-provider\n"
      + "ds-cfg-synchronization-provider-enabled: true\n"
      + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
    synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
    // Synchro suffix
    synchroServerEntry = null;
    // Add config entries to the current DS server based on :
    // Add the synchronization plugin: synchroPluginEntry & synchroPluginStringDN
    // Add synchroServerEntry
    // Add changeLogEntry
    configureSynchronization();
    taskInitFromS2 = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-replica-server-id: " + server2ID);
    taskInitTargetS2 = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-replica-server-id: " + server2ID);
    taskInitTargetAll = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-replica-server-id: all");
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    changeLogEntry = null;
  }
  // Tests that entries have been written in the db
  private void testEntriesInDb()
  {
    log("TestEntriesInDb");
    short found = 0;
    for (String entry : updatedEntries)
    {
      int dns = entry.indexOf("dn: ");
      int dne = entry.indexOf("dc=com");
      String dn = entry.substring(dns+4,dne+6);
      log("Search Entry: " + dn);
      DN entryDN = null;
      try
      {
        entryDN = DN.decode(dn);
      }
      catch(Exception e)
      {
        log("TestEntriesInDb/" + e);
      }
      try
      {
        Entry resultEntry = getEntry(entryDN, 1000, true);
        if (resultEntry==null)
        {
          log("Entry not found <" + dn + ">");
        }
        else
        {
          log("Entry found <" + dn + ">");
          found++;
        }
      }
      catch(Exception e)
      {
        log("TestEntriesInDb/", e);
      }
    }
    assertEquals(found, updatedEntries.length,
        " Entries present in DB :" + found +
        " Expected entries :" + updatedEntries.length);
  }
  private void addTask(Entry taskEntry, ResultCode expectedResult,
      int errorMessageID)
  {
    try
    {
      log("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(getMessage(errorMessageID).toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + getMessage(errorMessageID) + ">");
        log("Create config task: <"+ errorMessageID + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, -1);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      log("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  /**
   * Wait a task to be completed and check the expected state and expected
   * stats.
   * @param taskEntry The task to process.
   * @param expectedState The expected state fot this task.
   * @param expectedLeft The expected number of entries still to be processed.
   * @param expectedDone The expected numner of entries to be processed.
   */
  private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
      long expectedLeft, long expectedDone)
  {
    try
    {
      // FIXME - Factorize with TasksTestCase
      // Wait until the task completes.
      int timeout = 2000;
      AttributeType completionTimeType = DirectoryServer.getAttributeType(
          ATTR_TASK_COMPLETION_TIME.toLowerCase());
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      String completionTime = null;
      long startMillisecs = System.currentTimeMillis();
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          // FIXME How is this possible?  Must be issue 858.
          fail("Task entry was not returned from the search.");
          continue;
        }
        completionTime =
          resultEntry.getAttributeValue(completionTimeType,
              DirectoryStringSyntax.DECODER);
        if (completionTime == null)
        {
          if (System.currentTimeMillis() - startMillisecs > 1000*timeout)
          {
            break;
          }
          Thread.sleep(10);
        }
      } while (completionTime == null);
      if (completionTime == null)
      {
        fail("The task had not completed after " + timeout + " seconds.");
      }
      // Check that the task state is as expected.
      AttributeType taskStateType =
        DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
      String stateString =
        resultEntry.getAttributeValue(taskStateType,
            DirectoryStringSyntax.DECODER);
      TaskState taskState = TaskState.fromString(stateString);
      assertEquals(taskState, expectedState,
          "The task completed in an unexpected state");
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if (taskState != TaskState.COMPLETED_SUCCESSFULLY &&
          logMessages.size() == 0)
      {
        fail("No log messages were written to the task entry on a failed task");
      }
      try
      {
        // Check that the task state is as expected.
        taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
        stateString =
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
        assertEquals(Long.decode(stateString).longValue(),expectedLeft,
            "The number of entries to process is not correct.");
        // Check that the task state is as expected.
        taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
        stateString =
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
        assertEquals(Long.decode(stateString).longValue(),expectedDone,
            "The number of entries processed is not correct.");
      }
      catch(Exception e)
      {
        fail("Exception"+ e.getMessage()+e.getStackTrace());
      }
    }
    catch(Exception e)
    {
      fail("Exception"+ e.getMessage()+e.getStackTrace());
    }
  }
  private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      int expectedMessage)
  {
    TaskState taskState = null;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          // FIXME How is this possible?  Must be issue 858.
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        try
        {
          // Check that the left counter.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
          String leftString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          // Check that the total counter.
          taskStateType =
           DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
          String totalString =
           resultEntry.getAttributeValue(taskStateType,
               DirectoryStringSyntax.DECODER);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(2000);
      }
      while ((taskState != expectedTaskState) &&
          (taskState != TaskState.STOPPED_BY_ERROR));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          if (expectedMessage > 0)
          {
            log(logMessages.get(0));
            log(getMessage(expectedMessage));
            assertTrue(logMessages.get(0).indexOf(
                getMessage(expectedMessage))>0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  private void addTestEntriesToDB()
  {
    try
    {
      for (String ldifEntry : updatedEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        AddOperation addOp = new AddOperation(connection,
            InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
            entry.getUserAttributes(), entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          log("addEntry: Failed" + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /*
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries()
  {
    String[] entries =
    {
        "dn: dc=example,dc=com\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
          "dn: ou=People,dc=example,dc=com\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
          "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
        + "telephonenumber: +1 408 555 1212\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
          "dn: cn=Robert Langman,ou=people,dc=example,dc=com\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: +1 408 555 1213\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
        };
    return entries;
  }
  /**
   * Broker will send the entries to a server.
   * @param broker The broker that will send the entries.
   * @param senderID The serverID of this broker.
   * @param destinationServerID The target server.
   * @param requestorID The initiator server.
   */
  private void makeBrokerPublishEntries(ChangelogBroker broker,
      short senderID, short destinationServerID, short requestorID)
  {
    // Send entries
    try
    {
      RoutableMessage initTargetMessage = new InitializeTargetMessage(
          baseDn, server2ID, destinationServerID, requestorID, updatedEntries.length);
      broker.publish(initTargetMessage);
      for (String entry : updatedEntries)
      {
        log("Broker will pusblish 1 entry: bytes:"+ entry.length());
        EntryMessage entryMsg = new EntryMessage(senderID, destinationServerID,
            entry.getBytes());
        broker.publish(entryMsg);
      }
      DoneMessage doneMsg = new DoneMessage(senderID, destinationServerID);
      broker.publish(doneMsg);
      log("Broker " + senderID + " published entries");
    }
    catch(Exception e)
    {
      fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " "
          + stackTraceToSingleLineString(e));
    }
  }
  void receiveUpdatedEntries(ChangelogBroker broker, short serverID,
      String[] updatedEntries)
  {
    // Expect the broker to receive the entries
    SynchronizationMessage msg;
    short entriesReceived = 0;
    while (true)
    {
      try
      {
        log("Broker " + serverID + " Wait for entry or done msg");
        msg = broker.receive();
        if (msg == null)
          break;
        if (msg instanceof InitializeTargetMessage)
        {
          log("Broker " + serverID + " receives InitializeTargetMessage ");
          entriesReceived = 0;
        }
        else if (msg instanceof EntryMessage)
        {
          EntryMessage em = (EntryMessage)msg;
          log("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
          entriesReceived++;
        }
        else if (msg instanceof DoneMessage)
        {
          log("Broker " + serverID + "  receives done ");
          break;
        }
        else if (msg instanceof ErrorMessage)
        {
          ErrorMessage em = (ErrorMessage)msg;
          log("Broker " + serverID + "  receives ERROR "
              + getMessage(em.getMsgID())
              + " " + em.getDetails());
          break;
        }
        else
        {
          log("Broker " + serverID + " receives and trashes " + msg);
        }
      }
      catch(Exception e)
      {
        log("receiveUpdatedEntries" + stackTraceToSingleLineString(e));
      }
    }
    assertTrue(entriesReceived == updatedEntries.length,
        " Received entries("+entriesReceived +
        ") == Expected entries("+updatedEntries.length+")");
  }
  /**
   * Creates a new changelog server.
   * @param changelogId The serverID of the changelog to create.
   * @return The new changelog server.
   */
  private Changelog createChangelogServer(short changelogId)
  {
    try
    {
      if ((changelogId==changelog1ID)&&(changelog1!=null))
        return changelog1;
      if ((changelogId==changelog2ID)&&(changelog2!=null))
        return changelog2;
      {
        int chPort = getChangelogPort(changelogId);
        // Create a changelog server
        String changelogLdif = "dn: cn=Changelog Server\n"
          + "objectClass: top\n"
          + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + chPort + "\n"
          + "ds-cfg-changelog-server-id: " + changelogId + "\n"
//        + "ds-cfg-heartbeat-interval: 0 ms\n"
          + "ds-cfg-window-size: 100" + "\n";
        if (changelogId==changelog2ID)
        {
          changelogLdif += new String(
              "ds-cfg-changelog-server: localhost:"
              + getChangelogPort(changelog1ID)+"\n");
        }
        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
        Changelog changelog = new Changelog(changelogConfig);
        Thread.sleep(1000);
        return changelog;
      }
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Create a synchronized suffix in the current server providing the
   * changelog serverID.
   * @param changelogID
   */
  private void connectServer1ToChangelog(short changelogID)
  {
    // Connect DS to the changelog
    try
    {
      // suffix synchronized
      String synchroServerStringDN = synchroPluginStringDN;
      String synchroServerLdif = "dn: cn=example," + synchroServerStringDN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-synchronization-provider-config\n"
      + "cn: example\n"
      + "ds-cfg-synchronization-dn: dc=example,dc=com\n"
      + "ds-cfg-changelog-server: localhost:"
      + getChangelogPort(changelogID)+"\n"
      + "ds-cfg-directory-server-id: " + server1ID + "\n"
      + "ds-cfg-receive-status: true\n"
//    + "ds-cfg-heartbeat-interval: 0 ms\n"
      + "ds-cfg-window-size: " + WINDOW_SIZE;
      if (synchroServerEntry == null)
      {
        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        entryList.add(synchroServerEntry.getDN());
        sd = SynchronizationDomain.retrievesSynchronizationDomain(baseDn);
        // Clear the backend
        SynchronizationDomain.clearJEBackend(false,
            sd.getBackend().getBackendID(),
            baseDn.toNormalizedString());
      }
      if (sd != null)
      {
         log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
      }
    }
    catch(Exception e)
    {
      log("connectServer1ToChangelog", e);
      fail("connectServer1ToChangelog", e);
    }
  }
  private int getChangelogPort(short changelogID)
  {
    return (changelogPort+changelogID);
  }
  /**
   * Tests the import side of the Initialize task
   */
  @Test(enabled=false)
  public void InitializeImport() throws Exception
  {
    String testCase = "InitializeImport";
    log("Starting "+testCase);
    try
    {
      changelog1 = createChangelogServer(changelog1ID);
      // Connect DS to the changelog
      connectServer1ToChangelog(changelog1ID);
      if (server2 == null)
        server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      Thread.sleep(2000);
      // In S1 launch the total update
      addTask(taskInitFromS2, ResultCode.SUCCESS, 0);
      // S2 should receive init msg
      SynchronizationMessage msg;
      msg = server2.receive();
      if (!(msg instanceof InitializeRequestMessage))
      {
        fail(testCase + " Message received by S2 is of unexpected class" + msg);
      }
      InitializeRequestMessage initMsg = (InitializeRequestMessage)msg;
      // S2 publishes entries to S1
      makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
          initMsg.getsenderID());
      // Wait for task (import) completion in S1
      waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY,
          0, updatedEntries.length);
      // Test import result in S1
      testEntriesInDb();
      cleanEntries();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Tests the export side of the Initialize task
   */
  @Test(enabled=false)
  public void InitializeExport() throws Exception
  {
    String testCase = "Synchronization/InitializeExport";
    log("Starting "+testCase);
    changelog1 = createChangelogServer(changelog1ID);
    // Connect DS to the changelog
    connectServer1ToChangelog(changelog1ID);
    addTestEntriesToDB();
    if (server2 == null)
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    Thread.sleep(3000);
    InitializeRequestMessage initMsg = new InitializeRequestMessage(baseDn,
        server2ID, server1ID);
    server2.publish(initMsg);
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    log("Successfully ending "+testCase);
}
  /**
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetExport() throws Exception
  {
    String testCase = "Synchronization/InitializeTargetExport";
    log("Starting " + testCase);
    changelog1 = createChangelogServer(changelog1ID);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
    // Add in S1 the entries to be exported
    addTestEntriesToDB();
    // S1 is the server we are running in, S2 is simulated by a broker
    if (server2 == null)
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    Thread.sleep(1000);
    // Launch in S1 the task that will initialize S2
    addTask(taskInitTargetS2, ResultCode.SUCCESS, 0);
    // Wait for task completion
    waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, -1);
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    log("Successfully ending " + testCase);
  }
  /**
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetExportAll() throws Exception
  {
    String testCase = "Synchronization/InitializeTargetExportAll";
    log("Starting " + testCase);
    changelog1 = createChangelogServer(changelog1ID);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
    // Add in S1 the entries to be exported
    addTestEntriesToDB();
    // S1 is the server we are running in, S2 and S3 are simulated by brokers
    if (server2==null)
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    ChangelogBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    Thread.sleep(1000);
    // Launch in S1 the task that will initialize S2
    addTask(taskInitTargetAll, ResultCode.SUCCESS, 0);
    // Wait for task completion
    waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, -1);
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
    cleanEntries();
    log("Successfully ending " + testCase);
  }
 /**
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetImport() throws Exception
  {
    String testCase = "InitializeTargetImport";
    try
    {
      log("Starting " + testCase + " debugEnabled:" + debugEnabled());
      // Start SS
      changelog1 = createChangelogServer(changelog1ID);
      // S1 is the server we are running in, S2 is simulated by a broker
      if (server2==null)
        server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
      // S2 publishes entries to S1
      makeBrokerPublishEntries(server2, server2ID, server1ID, server2ID);
      Thread.sleep(10000); // FIXME - how to know the import is done
      // Test that entries have been imported in S1
      testEntriesInDb();
      cleanEntries();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetConfigErrors() throws Exception
  {
    String testCase = "InitializeTargetConfigErrors";
    try
    {
      log("Starting " + testCase);
      // Invalid domain base dn
      Entry taskInitTarget = TestCaseUtils.makeEntry(
          "dn: ds-task-id=" + UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
          "ds-task-initialize-domain-dn: foo",
          "ds-task-initialize-remote-replica-server-id: " + server2ID);
      addTask(taskInitTarget, ResultCode.INVALID_DN_SYNTAX,
          TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
      // Domain base dn not related to any domain
      taskInitTarget = TestCaseUtils.makeEntry(
          "dn: ds-task-id=" + UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
          "ds-task-initialize-domain-dn: dc=foo",
          "ds-task-initialize-remote-replica-server-id: " + server2ID);
      addTask(taskInitTarget, ResultCode.OTHER,
          LogMessages.MSGID_NO_MATCHING_DOMAIN);
      // Invalid scope
      // createTask(taskInitTargetS2);
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      cleanEntries();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeConfigErrors() throws Exception
  {
    String testCase = "InitializeConfigErrors";
    try
    {
      log("Starting " + testCase);
      // Start SS
      changelog1 = createChangelogServer(changelog1ID);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
      // Invalid domain base dn
      Entry taskInit = TestCaseUtils.makeEntry(
          "dn: ds-task-id=" + UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: foo",
          "ds-task-initialize-source: " + server2ID);
      addTask(taskInit, ResultCode.INVALID_DN_SYNTAX,
          TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
      // Domain base dn not related to any domain
      taskInit = TestCaseUtils.makeEntry(
          "dn: ds-task-id=" + UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: dc=foo",
          "ds-task-initialize-source: " + server2ID);
      addTask(taskInit, ResultCode.OTHER, LogMessages.MSGID_NO_MATCHING_DOMAIN);
      // Invalid Source
      taskInit = TestCaseUtils.makeEntry(
          "dn: ds-task-id=" + UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: " + baseDn,
          "ds-task-initialize-source: -3");
      addTask(taskInit, ResultCode.OTHER,
          LogMessages.MSGID_INVALID_IMPORT_SOURCE);
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      cleanEntries();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  @Test(enabled=false)
  public void InitializeTargetBroken() throws Exception
  {
    String testCase = "InitializeTargetBroken";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeBroken() throws Exception
  {
    String testCase = "InitializeBroken";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetExportMultiSS() throws Exception
  {
    String testCase = "Synchronization/InitializeTargetExportMultiSS";
    log("Starting " + testCase);
    // Create 2 changelogs
    changelog1 = createChangelogServer(changelog1ID);
    changelog2 = createChangelogServer(changelog2ID);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
    // Add in S1 the entries to be exported
    addTestEntriesToDB();
    // S1 is the server we are running in, S2 is simulated by a broker
    // connected to changelog2
    if (server2 == null)
    {
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    }
    Thread.sleep(1000);
    // Launch in S1 the task that will initialize S2
    addTask(taskInitTargetS2, ResultCode.SUCCESS, 0);
    // Wait for task completion
    waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, -1);
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    changelog2.shutdown();
    changelog2 = null;
    log("Successfully ending " + testCase);
  }
  @Test(enabled=false)
  public void InitializeExportMultiSS() throws Exception
  {
    String testCase = "Synchronization/InitializeExportMultiSS";
    log("Starting "+testCase);
    // Create 2 changelogs
    changelog1 = createChangelogServer(changelog1ID);
    Thread.sleep(3000);
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(3000);
    // Connect DS to the changelog 1
    connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
    addTestEntriesToDB();
    // Connect a broker acting as server 2 to changelog2
    if (server2 == null)
    {
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID),
        1000, emptyOldChanges);
    }
    Thread.sleep(3000);
    // S2 sends init request
    InitializeRequestMessage initMsg =
      new InitializeRequestMessage(baseDn, server2ID, server1ID);
    server2.publish(initMsg);
    // S2 should receive target, entries & done
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    changelog2.shutdown();
    changelog2 = null;
    log("Successfully ending "+testCase);
  }
  @Test(enabled=false)
  public void InitializeNoSource() throws Exception
  {
    String testCase = "InitializeNoSource";
    log("Starting "+testCase);
    // Start SS
    changelog1 = createChangelogServer(changelog1ID);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
    Entry taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-replica-server-id: " + 20);
    addTask(taskInit, ResultCode.SUCCESS, 0);
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        LogMessages.MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN);
    if (sd != null)
    {
       log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
    }
    log("Successfully ending "+testCase);
  }
  @Test(enabled=false)
  public void InitializeTargetNoTarget() throws Exception
  {
    String testCase = "InitializeTargetNoTarget"  + baseDn;
    log("Starting "+testCase);
    // Start SS
    changelog1 = createChangelogServer(changelog1ID);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
    addTestEntriesToDB();
    Entry taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-target",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-target-domain-dn: "+baseDn,
        "ds-task-initialize-target-scope: " + 10);
    addTask(taskInit, ResultCode.SUCCESS, 0);
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        LogMessages.MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN);
    if (sd != null)
    {
       log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
    }
    log("Successfully ending "+testCase);
  }
  @Test(enabled=false)
  public void InitializeStopped() throws Exception
  {
    String testCase = "InitializeStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetStopped() throws Exception
  {
    String testCase = "InitializeTargetStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeCompressed() throws Exception
  {
    String testCase = "InitializeStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetEncrypted() throws Exception
  {
    String testCase = "InitializeTargetCompressed";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeSimultaneous() throws Exception
  {
    String testCase = "InitializeSimultaneous";
    // Start SS
    changelog1 = createChangelogServer(changelog1ID);
    // Connect a broker acting as server 2 to changelog2
    if (server2 == null)
    {
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID),
        1000, emptyOldChanges);
    }
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
    Entry taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-replica-server-id: " + server2ID);
    addTask(taskInit, ResultCode.SUCCESS, 0);
    Thread.sleep(3000);
    Entry taskInit2 = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-replica-server-id: " + server2ID);
    // Second task is expected to be rejected
    addTask(taskInit2, ResultCode.SUCCESS, 0);
    waitTaskState(taskInit2, TaskState.STOPPED_BY_ERROR,
        LogMessages.MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED);
    // First task is stilll running
    waitTaskState(taskInit, TaskState.RUNNING, -1);
    // External request is supposed to be rejected
    // Now tests error in the middle of an import
    // S2 sends init request
    ErrorMessage msg =
      new ErrorMessage(server1ID, 1, "");
    server2.publish(msg);
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        1);
    cleanEntries();
    log("Successfully ending "+testCase);
  }
  /**
   * Disconnect broker and remove entries from the local DB
   * @throws Exception
   */
  protected void cleanEntries()
  {
    if (sd != null)
    {
       log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
    }
    // Clean brokers
    if (server2 != null)
    {
      server2.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      // fromthe changelog server.
      server2 = null;
    }
    super.cleanEntries();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,6 +26,7 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -54,6 +55,8 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ByteStringFactory;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -158,7 +161,11 @@
        }
      }
      catch (Exception e)
      { }
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1);
      }
    }
    return broker;
  }
@@ -221,7 +228,8 @@
        }
      }
      catch (Exception e)
      { }
      {
      }
    }
    return broker;
  }
@@ -231,6 +239,10 @@
   */
  protected void cleanEntries()
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "SynchronizationTestCase/Cleaning entries" , 1);
    DeleteOperation op;
    // Delete entries
    try
@@ -238,6 +250,10 @@
      while (true)
      {
        DN dn = entryList.removeLast();
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "cleaning entry " + dn, 1);
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
@@ -264,8 +280,13 @@
    // WORKAROUND FOR BUG #639 - BEGIN -
    if (mms != null)
    {
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationTestCase/FinalizeSynchronization Provider" , 1);
      DirectoryServer.deregisterSynchronizationProvider(mms);
      mms.finalizeSynchronizationProvider();
      mms = null;
    }
    // WORKAROUND FOR BUG #639 - END -
@@ -303,17 +324,22 @@
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
    if (changeLogEntry!=null)
    {
      DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry.getDN());
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
    entryList.add(synchroServerEntry.getDN());
      entryList.add(changeLogEntry.getDN());
    }
    if (synchroServerEntry!=null)
    {
      // We also have a replicated suffix (synchronization domain)
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
      "Unable to add the synchronized suffix");
      entryList.add(synchroServerEntry.getDN());
    }
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
          + "ds-cfg-changelog-db-directory: changelogDb"+i;
        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
        changelogs[i] = new Changelog(changelogConfig);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,16 +26,18 @@
 */
package org.opends.server.synchronization.protocol;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
import java.util.zip.DataFormatException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -56,8 +58,8 @@
import org.opends.server.types.ObjectClass;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
 * Test the contructors, encoders and decoders of the synchronization
@@ -520,6 +522,104 @@
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
  @Test()
  public void EntryMessageTest() throws Exception
  {
    String taskInitFromS2 = new String(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks\n" +
        "objectclass: top\n" +
        "objectclass: ds-task\n" +
        "objectclass: ds-task-initialize\n" +
        "ds-task-class-name: org.opends.server.tasks.InitializeTask" +
        "ds-task-initialize-domain-dn: dc=example,dc=com" +
        "ds-task-initialize-source: 1");
    short sender = 1;
    short target = 2;
    byte[] entry = taskInitFromS2.getBytes();
    EntryMessage msg = new EntryMessage(sender, target, entry);
    EntryMessage newMsg = new EntryMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
  }
  /**
   * Test that InitializeRequestMessage encoding and decoding works
   */
  @Test()
  public void InitializeRequestMessageTest() throws Exception
  {
    short sender = 1;
    short target = 2;
    InitializeRequestMessage msg = new InitializeRequestMessage(
        DN.decode("dc=example"), sender, target);
    InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
  }
  /**
   * Test that InitializeTargetMessage encoding and decoding works
   */
  @Test()
  public void InitializeTargetMessageTest() throws Exception
  {
    short senderID = 1;
    short targetID = 2;
    short requestorID = 3;
    long entryCount = 4;
    DN baseDN = DN.decode("dc=example");
    InitializeTargetMessage msg = new InitializeTargetMessage(
        baseDN, senderID, targetID, requestorID, entryCount);
    InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
    assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
    assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
    assertEquals(senderID, newMsg.getsenderID());
    assertEquals(targetID, newMsg.getDestination());
    assertEquals(requestorID, newMsg.getRequestorID());
    assertEquals(entryCount, newMsg.getEntryCount());
    assertTrue(baseDN.equals(newMsg.getBaseDN())) ;
  }
  /**
   * Test that DoneMessage encoding and decoding works
   */
  @Test()
  public void DoneMessage() throws Exception
  {
    DoneMessage msg = new DoneMessage((short)1, (short)2);
    DoneMessage newMsg = new DoneMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
  }
  /**
   * Test that ErrorMessage encoding and decoding works
   */
  @Test()
  public void ErrorMessage() throws Exception
  {
    ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
    ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getMsgID(), newMsg.getMsgID());
    assertEquals(msg.getDetails(), newMsg.getDetails());
  }
  /**
   * Test PendingChange
   */
  private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)