mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
29.53.2007 40cef7d36084fbe86d34cfa497628d8972c4c9e7
opends/resource/schema/02-config.ldif
@@ -1045,6 +1045,10 @@
  NAME 'ds-cfg-heartbeat-interval'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306
  NAME 'ds-cfg-changelog-db-directory'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307
  NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' )
@@ -1103,6 +1107,22 @@
  NAME 'ds-cfg-case-sensitive-validation' 
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE 
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332
  NAME 'ds-task-initialize-domain-dn'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333
  NAME 'ds-task-initialize-replica-server-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334
  NAME 'ds-task-unprocessed-entry-count'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335
  NAME 'ds-task-processed-entry-count'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
  MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1425,7 +1445,8 @@
  'ds-cfg-synchronization-changelog-server-config' SUP top
  STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
  ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
  SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
  X-ORIGIN 'OpenDS Directory Server' )
@@ -1532,4 +1553,14 @@
  SUP ds-cfg-password-validator STRUCTURAL
  MUST ( ds-cfg-maximum-consecutive-length $ ds-cfg-case-sensitive-validation )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.91
  NAME 'ds-task-initialize-from-remote-replica' SUP ds-task
  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.92
  NAME 'ds-task-initialize-remote-replica' SUP ds-task
  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
  X-ORIGIN 'OpenDS Directory Server' )
opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,6 +3665,62 @@
       NAME_PREFIX_TASK + "import-is-encrypted";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * initialize task definition.
   */
  public static final String OC_INITIALIZE_TASK =
    NAME_PREFIX_TASK + "initialize-from-remote-replica";
  /**
   * The name of the attribute in an initialize task definition that specifies
   * the base dn related to the synchonization domain to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN =
       NAME_PREFIX_TASK + "initialize-domain-dn";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the source in terms of source server from which to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_SOURCE =
       NAME_PREFIX_TASK + "initialize-replica-server-id";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * initialize target task definition.
   */
  public static final String OC_INITIALIZE_TARGET_TASK =
    NAME_PREFIX_TASK + "initialize-remote-replica";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the base dn related to the synchonization domain to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN =
       NAME_PREFIX_TASK + "initialize-domain-dn";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE =
       NAME_PREFIX_TASK + "initialize-replica-server-id";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_LEFT =
       NAME_PREFIX_TASK + "unprocessed-entry-count";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_DONE =
       NAME_PREFIX_TASK + "processed-entry-count";
  /**
   * The name of the objectclass that will be used for a Directory Server
opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,7 +211,19 @@
  public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17;
  /**
   * The message ID for the message that will be used when an invalid domain
   * base DN is provided as argument to the initialize target task.
   */
  public static final int  MSGID_TASK_INITIALIZE_TARGET_INVALID_DN =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18;
  /**
   * The message ID for the message that will be used when an invalid domain
   * base DN is provided as argument to the initialize task.
   */
  public static final int  MSGID_TASK_INITIALIZE_INVALID_DN =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19;
  /**
   * Associates a set of generic messages with the message IDs defined in this
@@ -279,6 +291,11 @@
    registerMessage(MSGID_TASK_LDIFEXPORT_INSUFFICIENT_PRIVILEGES,
                    "You do not have sufficient privileges to initiate an " +
                    "LDIF export.");
    registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN,
                    "Invalid DN provided with the Initialize Target task.");
    registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN,
                    "Invalid DN provided with the Initialize task.");
  }
}
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
  static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import com.sleepycat.je.DatabaseException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import com.sleepycat.je.DatabaseException;
/**
 * This class define an in-memory cache that will be used to store
 * the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
    }
  }
  /**
   * Send back an ack to the server that sent the change.
   * Retrieves the destination handlers for a routable message.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param msg The message to route.
   * @param senderHandler The handler of the server that published this message.
   * @return The list of destination handlers.
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  {
    short serverId = changeNumber.getServerId();
    sendAck(changeNumber, isLDAPserver, serverId);
  }
  /**
   *
   * Send back an ack to a server that sent the change.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param serverId     The identifier of the server from which we
   *                     received the change..
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
                      short serverId)
  {
    ServerHandler handler;
    if (isLDAPserver)
      handler = connectedServers.get(serverId);
    else
      handler = changelogServers.get(serverId);
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    // TODO : check for null handler and log error
    try
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      handler.sendAck(changeNumber);
    } catch (IOException e)
    {
      /*
       * An error happened trying the send back an ack to this server.
       * Log an error and close the connection to this server.
       */
      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
      String message = getMessage(msgID, this.toString())
                                  + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      handler.shutdown();
      // TODO Import from the "closest server" to be implemented
    }
  }
  /**
   * Shutdown this ChangelogCache.
   */
  public void shutdown()
  {
    // Close session with other changelogs
    for (ServerHandler serverHandler : changelogServers.values())
    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
    {
      serverHandler.shutdown();
    }
    // Close session with other LDAP servers
    for (ServerHandler serverHandler : connectedServers.values())
    {
      serverHandler.shutdown();
    }
    // Shutdown the dbHandlers
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      if (!senderHandler.isChangelogServer())
      {
        dbHandler.shutdown();
        // Send to all changelogServers
        for (ServerHandler destinationHandler : changelogServers.values())
        {
          servers.add(destinationHandler);
        }
      }
      sourceDbHandlers.clear();
      // Send to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
          continue;
        servers.add(destinationHandler);
      }
    }
    else
    {
      // Destination is one server
      ServerHandler destinationHandler =
        connectedServers.get(msg.getDestination());
      if (destinationHandler != null)
      {
        servers.add(destinationHandler);
      }
      else
      {
        // the targeted server is NOT connected
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(changelogServers.values());
        }
      }
    }
    return servers;
  }
  /**
   * Returns the ServerState describing the last change from this replica.
   * Process an InitializeRequestMessage.
   *
   * @return The ServerState describing the last change from this replica.
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
   * the message.
   */
  public ServerState getDbServerState()
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    ServerState serverState = new ServerState();
    for (DbHandler db : sourceDbHandlers.values())
    {
      serverState.update(db.getLastChange());
    }
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    return "ChangelogCache " + baseDn;
  }
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    if (servers.isEmpty())
    {
      handler.checkWindow();
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
        }
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        }
      }
      return;
    }
    for (ServerHandler handler : connectedServers.values())
    for (ServerHandler targetHandler : servers)
    {
      handler.checkWindow();
      try
      {
        targetHandler.send(msg);
      }
      catch(IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
      }
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    /**
     * Send back an ack to the server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
    {
      if (!handler.restartAfterSaturation(sourceHandler))
      short serverId = changeNumber.getServerId();
      sendAck(changeNumber, isLDAPserver, serverId);
    }
    /**
     *
     * Send back an ack to a server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     * @param serverId     The identifier of the server from which we
     *                     received the change..
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
        short serverId)
    {
      ServerHandler handler;
      if (isLDAPserver)
        handler = connectedServers.get(serverId);
      else
        handler = changelogServers.get(serverId);
      // TODO : check for null handler and log error
      try
      {
        handler.sendAck(changeNumber);
      } catch (IOException e)
      {
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        handler.shutdown();
      }
    }
    /**
     * Shutdown this ChangelogCache.
     */
    public void shutdown()
    {
      // Close session with other changelogs
      for (ServerHandler serverHandler : changelogServers.values())
      {
        serverHandler.shutdown();
      }
      // Close session with other LDAP servers
      for (ServerHandler serverHandler : connectedServers.values())
      {
        serverHandler.shutdown();
      }
      // Shutdown the dbHandlers
      synchronized (sourceDbHandlers)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        {
          dbHandler.shutdown();
        }
        sourceDbHandlers.clear();
      }
    }
    /**
     * Returns the ServerState describing the last change from this replica.
     *
     * @return The ServerState describing the last change from this replica.
     */
    public ServerState getDbServerState()
    {
      ServerState serverState = new ServerState();
      for (DbHandler db : sourceDbHandlers.values())
      {
        serverState.update(db.getLastChange());
      }
      return serverState;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      return "ChangelogCache " + baseDn;
    }
    /**
     * Check if some server Handler should be removed from flow control state.
     * @throws IOException If an error happened.
     */
    public void checkAllSaturation() throws IOException
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.checkWindow();
      }
      for (ServerHandler handler : connectedServers.values())
      {
        handler.checkWindow();
      }
    }
    /**
     * Check if a server that was in flow control can now restart
     * sending updates.
     * @param sourceHandler The server that must be checked.
     * @return true if the server can restart sending changes.
     *         false if the server can't restart sending changes.
     */
    public boolean restartAfterSaturation(ServerHandler sourceHandler)
    {
      for (ServerHandler handler : changelogServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
      }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
      for (ServerHandler handler : connectedServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
      }
      return true;
    }
    return true;
  }
}
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -44,6 +46,18 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -51,17 +65,6 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +111,7 @@
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private short changelogId;
  /**
   * The time in milliseconds between heartbeats from the synchronization
@@ -155,6 +159,7 @@
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  {
    this.changelogId = changelogId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
@@ -1263,4 +1268,40 @@
  {
    return heartbeatInterval;
  }
  /**
   * Processes a routable message.
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
    changelogCache.process(msg, this);
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
    session.publish(msg);
  }
}
opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,6 +34,11 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
@@ -116,6 +121,33 @@
          WindowMessage windowMsg = (WindowMessage) msg;
          handler.updateWindow(windowMsg);
        }
        else if (msg instanceof InitializeRequestMessage)
        {
          InitializeRequestMessage initializeMsg =
            (InitializeRequestMessage) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof InitializeTargetMessage)
        {
          InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof EntryMessage)
        {
          EntryMessage entryMsg = (EntryMessage) msg;
          handler.process(entryMsg);
        }
        else if (msg instanceof DoneMessage)
        {
          DoneMessage doneMsg = (DoneMessage) msg;
          handler.process(doneMsg);
        }
        else if (msg instanceof ErrorMessage)
        {
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
      }
    } catch (IOException e)
    {
opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,13 +317,61 @@
    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
  /**
   * The message id for thedescription of the attribute used to configure
   * The message id for the description of the attribute used to configure
   * the purge delay of the Changelog Servers.
   */
  public static final int MSGID_PURGE_DELAY_ATTR =
    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
  /**
   * The message id for the error raised when export/import
   * is rejected due to an export/import already in progress.
   */
  public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44;
  /**
   * The message id for the error raised when import
   * is rejected due to an invalid source of data imported.
   */
  public static final int MSGID_INVALID_IMPORT_SOURCE =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45;
  /**
   * The message id for the error raised when export
   * is rejected due to an invalid target to export datas.
   */
  public static final int MSGID_INVALID_EXPORT_TARGET =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46;
  /**
   * The message id for the error raised when import/export message
   * cannot be routed to an up-and-running target in the domain.
   */
  public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47;
  /**
   * The message ID for the message that will be used when no domain
   * can be found matching the provided domain base DN.
   */
  public static final int  MSGID_NO_MATCHING_DOMAIN =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48;
  /**
   * The message ID for the message that will be used when no domain
   * can be found matching the provided domain base DN.
   */
  public static final int  MSGID_MULTIPLE_MATCHING_DOMAIN
       = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49;
  /**
   * The message ID for the message that will be used when the domain
   * belongs to a provider class that does not allow the export.
   */
  public static final int  MSGID_INVALID_PROVIDER =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50;
  /**
   * Register the messages from this class in the core server.
@@ -449,5 +497,20 @@
        " restored because changelog servers would not be able to refresh" +
        " LDAP servers with older versions of the data. A zero value" +
        " can be used to specify an infinite delay (or never purge).");
    MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
        "The current request is rejected due to an import or an export" +
        " already in progress for the same data.");
    MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE,
        "Invalid source for the import.");
    MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET,
        "Invalid target for the export.");
    MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
        "No reachable peer in the domain.");
    MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN,
        "No domain matches the base DN provided.");
    MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN,
        "Multiple domains match the base DN provided.");
    MessageHandler.registerMessage(MSGID_INVALID_PROVIDER,
        "The provider class does not allow the operation requested.");
  }
}
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -345,6 +347,10 @@
          {
            if (session != null)
            {
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  "Broker : connect closing session" , 1);
              session.close();
              session = null;
            }
@@ -498,6 +504,7 @@
      try
      {
        SynchronizationMessage msg = session.receive();
        if (msg instanceof WindowMessage)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
@@ -544,6 +551,11 @@
    connected = false;
    try
    {
      if (debugEnabled())
      {
        debugInfo("ChangelogBroker Stop Closing session");
      }
      session.close();
    } catch (IOException e)
    {}
@@ -682,4 +694,12 @@
  {
    return numLostConnections;
  }
  private void log(String message)
  {
    int    msgID   = MSGID_UNKNOWN_TYPE;
    logError(ErrorLogCategory.SYNCHRONIZATION,
           ErrorLogSeverity.SEVERE_ERROR,
           message, msgID);
  }
}
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,12 +27,14 @@
package org.opends.server.synchronization.plugin;
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import java.io.IOException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
/**
 * This class implements a thread to monitor heartbeat messages from the
 * synchronization server.  Each broker runs one of these threads.
@@ -103,6 +105,9 @@
        long lastReceiveTime = session.getLastReceiveTime();
        if (now > lastReceiveTime + 2 * heartbeatInterval)
        {
          debugInfo("Heartbeat monitor is closing the broker session " +
          "because it could not detect a heartbeat.");
          // Heartbeat is well overdue so the server is assumed to be dead.
          if (debugEnabled())
          {
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@
   *             Can be null is the request has no associated operation.
   * @return     The Synchronization domain for this DN.
   */
  private static SynchronizationDomain findDomain(DN dn, Operation op)
  public static SynchronizationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special synchronization code on Operation that are
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,7 +233,8 @@
    {
      int msgID = MSGID_ERROR_UPDATING_RUV;
      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
          op.toString(), op.getErrorMessage(), baseDn.toString());
          op.toString(), op.getErrorMessage(), baseDn.toString(),
          Thread.currentThread().getStackTrace());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,47 +26,59 @@
 */
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_ABBR;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_FULL;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.ConfigMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -77,19 +89,30 @@
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -137,8 +160,96 @@
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  short serverId;
  private short serverId;
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   */
  private class IEContext
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    SynchroLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMessage.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry left to be processed
    long entryLeftCount = 0;
    // The exception raised when any
    DirectoryException exception = null;
    /**
     * Initializes the counters of the task with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initTaskCounters(long count)
    {
      entryCount = count;
      entryLeftCount = count;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
      }
    }
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     */
    public void updateTaskCounters()
    {
      entryLeftCount--;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
        }
      }
    }
    /**
     * Update the state of the task.
     */
    protected TaskState updateTaskCompletionState()
    {
      if (exception == null)
        return TaskState.COMPLETED_SUCCESSFULLY;
      else
        return TaskState.STOPPED_BY_ERROR;
    }
  }
  // The context related to an import or export being processed
  // Null when none is being processed.
  private IEContext ieContext = null;
  // The backend informations necessary to make an import or export.
  private Backend backend;
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
@@ -160,6 +271,7 @@
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -205,8 +317,6 @@
    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
  }
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   *
@@ -217,6 +327,7 @@
  public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
  {
    super("Synchronization flush");
    /*
     * read the centralized changelog server configuration
     * this is a multivalued attribute
@@ -397,6 +508,10 @@
        if (!receiveStatus)
          broker.suspendReceive();
      }
      // Retrieves the related backend and its config entry
      retrievesBackendInfos(baseDN);
    } catch (Exception e)
    {
     /* TODO should mark that changelog service is
@@ -803,9 +918,9 @@
  }
  /**
   * Receive an update message from the changelog.
   * Receives an update message from the changelog.
   * also responsible for updating the list of pending changes
   * @return the received message
   * @return the received message - null if none
   */
  public UpdateMessage receive()
  {
@@ -823,7 +938,7 @@
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
@@ -834,6 +949,56 @@
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
            }
            catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
            }
            catch(DirectoryException de)
            {
              // Return an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  changelog did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
@@ -876,7 +1041,9 @@
  public void receiveAck(AckMessage ack)
  {
    UpdateMessage update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    ChangeNumber changeNumber;
    changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    {
@@ -1105,7 +1272,7 @@
        synchronized (this)
        {
          this.wait(1000);
          if (!disabled )
          if (!disabled && !stateSavingDisabled )
          {
            // save the RUV
            state.save();
@@ -1151,6 +1318,8 @@
      this.notify();
    }
    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
    // stop the ChangelogBroker
    broker.stop();
  }
@@ -1857,6 +2026,7 @@
    return broker.getNumLostConnections();
  }
  /**
   * Check if the domain solve conflicts.
   *
@@ -1933,6 +2103,988 @@
    // Nothing is needed at the moment
  }
  /*
   * Total Update >>
   */
  /**
   * Receives bytes related to an entry in the context of an import to
   * initialize the domain (called by SynchronizationDomainLDIFInputStream).
   *
   * @return The bytes. Null when the Done or Err message has been received
   */
  public byte[] receiveEntryBytes()
  {
    SynchronizationMessage msg;
    while (true)
    {
      try
      {
        msg = broker.receive();
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        log("receiveEntryBytes: received " + msg);
        if (msg instanceof EntryMessage)
        {
          // FIXME
          EntryMessage entryMsg = (EntryMessage)msg;
          byte[] entryBytes = entryMsg.getEntryBytes().clone();
          ieContext.updateTaskCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          return null;
        }
        else if (msg instanceof ErrorMessage)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ieContext.exception = new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails() , errorMsg.getMsgID());
          return null;
        }
        else
        {
          // Other messages received during an import are trashed
        }
      }
      catch(Exception e)
      {
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            "received an unexpected message type" , 1, e);
      }
      return null;
    }
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * @param errorMsg The error message received.
   */
  protected void abandonImportExport(ErrorMessage errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
          errorMsg.getDetails() , errorMsg.getMsgID());
      if (ieContext.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
        ieContext = null;
      }
    }
  }
  /**
   * Clears all the entries from the JE backend determined by the
   * be id passed into the method.
   *
   * @param  createBaseEntry  Indicate whether to automatically create the base
   *                          entry and add it to the backend.
   * @param beID  The be id to clear.
   * @param dn   The suffix of the backend to create if the the createBaseEntry
   *             boolean is true.
   * @throws Exception  If an unexpected problem occurs.
   */
  public static void clearJEBackend(boolean createBaseEntry, String beID,
      String dn) throws Exception
  {
    BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
    DN[] baseDNs = backend.getBaseDNs();
    // FIXME Should getConfigEntry be part of TaskUtils ?
    ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
    // FIXME Should setBackendEnabled be part of TaskUtils ?
    TaskUtils.setBackendEnabled(configEntry, false);
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
      {
        throw new RuntimeException(failureReason.toString());
      }
      try
      {
        backend.clearBackend(configEntry, baseDNs);
      }
      finally
      {
        LockFileManager.releaseLock(lockFile, failureReason);
      }
    }
    finally
    {
      TaskUtils.setBackendEnabled(configEntry, true);
    }
    if (createBaseEntry)
    {
      DN baseDN = DN.decode(dn);
      Entry e = createEntry(baseDN);
      backend = (BackendImpl)DirectoryServer.getBackend(beID);
      backend.addEntry(e, null);
    }
  }
  /**
   * Log debug message.
   * @param message The message to log.
   */
  private void log(String message)
  {
    if (debugEnabled())
    {
      debugInfo("DebugInfo" + message);
      int    msgID   = MSGID_UNKNOWN_TYPE;
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationDomain/ " + message, msgID);
    }
  }
  /**
   * Export the entries.
   * @throws DirectoryException when an error occured
   */
  protected void exportBackend() throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            String.valueOf(failureReason));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message + " " + stackTraceToSingleLineString(e), msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    //  Launch the export.
    try
    {
      DN[] baseDNs = {this.baseDN};
      backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, de.getErrorMessage());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, stackTraceToSingleLineString(e));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    finally
    {
      //  Clean up after the export by closing the export config.
      exportConfig.close();
      //  Release the shared lock on the backend.
      try
      {
        String lockFile = LockFileManager.getBackendLockFileName(backend);
        StringBuilder failureReason = new StringBuilder();
        if (! LockFileManager.releaseLock(lockFile, failureReason))
        {
          int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
          String message = getMessage(msgID, backend.getBackendID(),
              String.valueOf(failureReason));
          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
              message, msgID);
          throw new DirectoryException(
              ResultCode.OTHER, message, msgID, null);
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            stackTraceToSingleLineString(e));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
  }
  /**
   * Retrieves the backend object related to the domain and the backend's
   * config entry. They will be used for import and export.
   * TODO This should be in a shared package rather than here.
   *
   * @param baseDN The baseDN to retrieve the backend
   * @throws DirectoryException when an error occired
   */
  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
  {
    ArrayList<Backend>     backendList = new ArrayList<Backend>();
    ArrayList<ConfigEntry> entryList   = new ArrayList<ConfigEntry>();
    ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
    Backend backend = null;
    ConfigEntry backendConfigEntry = null;
    List<DN> branches = new ArrayList<DN>(0);
    // Retrieves the backend related to this domain
    Backend domainBackend = DirectoryServer.getBackend(baseDN);
    if (domainBackend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    // Retrieves its config entry and its DNs
    int code = getBackends(backendList, entryList, dnList);
    if (code != 0)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    int numBackends = backendList.size();
    for (int i=0; i < numBackends; i++)
    {
      Backend b = backendList.get(i);
      if (domainBackend.getBackendID() != b.getBackendID())
      {
        continue;
      }
      if (backend == null)
      {
        backend = domainBackend;
        backendConfigEntry = entryList.get(i).duplicate();
        branches = dnList.get(i);
      }
      else
      {
        int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
        String message = getMessage(msgID, domainBackend.getBackendID());
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    if (backend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
      String message = getMessage(msgID, domainBackend.getBackendID());
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    else if (! backend.supportsLDIFExport())
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_IMPORT;
      String message = getMessage(msgID, 0); // FIXME
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    this.backend = backend;
    this.backendConfigEntry = backendConfigEntry;
    this.branches = branches;
  }
  /**
   * Sends lDIFEntry entry lines to the export target currently set.
   *
   * @param lDIFEntry The lines for the LDIF entry.
   * @throws IOException when an error occured.
   */
  public void sendEntryLines(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // we just let down the export.
    if (ieContext.exception != null)
    {
      IOException ioe = new IOException(ieContext.exception.getMessage());
      ieContext = null;
      throw ioe;
    }
    // new entry then send the current one
    EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    ieContext.updateTaskCounters();
  }
  /**
   * Retrieves information about the backends defined in the Directory Server
   * configuration.
   *
   * @param  backendList  A list into which instantiated (but not initialized)
   *                      backend instances will be placed.
   * @param  entryList    A list into which the config entries associated with
   *                      the backends will be placed.
   * @param  dnList       A list into which the set of base DNs for each backend
   *                      will be placed.
   */
  private static int getBackends(ArrayList<Backend> backendList,
                                 ArrayList<ConfigEntry> entryList,
                                 ArrayList<List<DN>> dnList)
  throws DirectoryException
  {
    //  Get the base entry for all backend configuration.
    DN backendBaseDN = null;
    try
    {
      backendBaseDN = DN.decode(DN_BACKEND_BASE);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    ConfigEntry baseEntry = null;
    try
    {
      baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
    }
    catch (ConfigException ce)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    //  Iterate through the immediate children, attempting to parse them as
    //  backends.
    for (ConfigEntry configEntry : baseEntry.getChildren().values())
    {
      // Get the backend ID attribute from the entry.  If there isn't one, then
      // skip the entry.
      String backendID = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
        StringConfigAttribute idStub =
          new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
              true, false, true);
        StringConfigAttribute idAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
        if (idAttr == null)
        {
          continue;
        }
        else
        {
          backendID = idAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      //    Get the backend class name attribute from the entry.  If there isn't
      //    one, then just skip the entry.
      String backendClassName = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
        StringConfigAttribute classStub =
          new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
              true, false, false);
        StringConfigAttribute classAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
        if (classAttr == null)
        {
          continue;
        }
        else
        {
          backendClassName = classAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Class backendClass = null;
      try
      {
        backendClass = Class.forName(backendClassName);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Backend backend = null;
      try
      {
        backend = (Backend) backendClass.newInstance();
        backend.setBackendID(backendID);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      // Get the base DN attribute from the entry.  If there isn't one, then
      // just skip this entry.
      List<DN> baseDNs = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
        DNConfigAttribute baseDNStub =
          new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
              true, true, true);
        DNConfigAttribute baseDNAttr =
          (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
        if (baseDNAttr == null)
        {
          msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND;
          String message = getMessage(msgID,
              String.valueOf(configEntry.getDN()));
          throw new DirectoryException(
              DirectoryServer.getServerErrorResultCode(), message,msgID, null);
        }
        else
        {
          baseDNs = baseDNAttr.activeValues();
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      backendList.add(backend);
      entryList.add(configEntry);
      dnList.add(baseDNs);
    }
    return 0;
      }
  /**
   * Initializes this domain from another source server.
   *
   * @param source The source from which to initialize
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initialize(short source, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.initializeTask = initTask;
    InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
        baseDN, serverId, source);
    // Publish Init request msg
    broker.publish(initializeMsg);
    // .. we expect to receive entries or err after that
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representaing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    short  source = 0;
    Throwable cause = null;
    try
    {
      source = Integer.decode(sourceString).shortValue();
      if (source >= -1)
      {
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        log("Source decoded for import:" + source);
        return source;
      }
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param targetString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeTarget(String targetString)
  throws DirectoryException
  {
    short  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMessage.ALL_SERVERS;
    }
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString).shortValue();
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
      }
      return target;
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  private synchronized void acquireIEContext()
  throws DirectoryException
  {
    if (ieContext != null)
    {
      // Rejects 2 simultaneous exports
      int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
      String message = getMessage(msgID);
      throw new DirectoryException(ResultCode.OTHER,
          message, msgID);
    }
    ieContext = new IEContext();
  }
  private synchronized void releaseIEContext()
  {
    ieContext = null;
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument.
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, Task initTask)
  throws DirectoryException
  {
    initializeTarget(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization has been
   * initiated by another server than this one.
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.exportTarget = target;
    ieContext.initializeTask = initTask;
    ieContext.initTaskCounters(backend.getEntryCount());
    // Send start message
    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
        baseDN, serverId, ieContext.exportTarget, requestorID,
        ieContext.entryLeftCount);
    log("SD : publishes " + initializeMessage +
        " for #entries=" + ieContext.entryCount);
    broker.publish(initializeMessage);
    // make an export and send entries
    exportBackend();
    // Successfull termnation
    DoneMessage doneMsg = new DoneMessage(serverId,
      initializeMessage.getDestination());
    broker.publish(doneMsg);
    if (ieContext != null)
    {
      ieContext.updateTaskCompletionState();
      ieContext = null;
    }
  }
  /**
   * Process backend before import.
   * @param backend The backend.
   * @param backendConfigEntry The config entry of the backend.
   * @throws Exception
   */
  private void preBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws Exception
  {
    // Stop saving state
    stateSavingDisabled = true;
    // Clear the backend
    clearJEBackend(false,backend.getBackendID(),null);
    // FIXME setBackendEnabled should be part of TaskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, false);
    // Acquire an exclusive lock for the backend.
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      throw new DirectoryException(ResultCode.OTHER, message, msgID);
    }
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void importBackend(InitializeTargetMessage initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    try
    {
      log("startImport");
      if (initializeMessage.getRequestorID() == serverId)
      {
        // The import responds to a request we did so the IEContext
        // is already acquired
      }
      else
      {
        acquireIEContext();
      }
      ieContext.importSource = initializeMessage.getsenderID();
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initTaskCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend, this.backendConfigEntry);
      DN[] baseDNs = {baseDN};
      ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
      importConfig =
        new LDIFImportConfig(ieContext.ldifImportInputStream);
      importConfig.setIncludeBranches(this.branches);
      // TODO How to deal with rejected entries during the import
      // importConfig.writeRejectedEntries("rejectedImport",
      // ExistingFileBehavior.OVERWRITE);
      // Process import
      this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
      stateSavingDisabled = false;
      // Re-exchange state with SS
      broker.stop();
      broker.start(changelogServers);
    }
    catch(Exception e)
    {
      throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
          2);// FIXME
    }
    finally
    {
      // Cleanup
      importConfig.close();
      // Re-enable backend
      closeBackendImport(this.backend, this.backendConfigEntry);
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      {
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
      }
      releaseIEContext();
      log("End importBackend");
    }
    // Success
  }
  /**
   * Make post import operations.
   * @param backend The backend implied in the import.
   * @param backendConfigEntry The config entry of the backend.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void closeBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws DirectoryException
  {
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    // Release lock
    if (!LockFileManager.releaseLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      new DirectoryException(ResultCode.OTHER, message, msgID);
    }
    // FIXME setBackendEnabled should be part taskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, true);
  }
  /**
   * Retrieves a synchronization domain based on the baseDN.
   *
   * @param baseDN The baseDN of the domain to retrieve
   * @return The domain retrieved
   * @throws DirectoryException When an error occured.
   */
  public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
  throws DirectoryException
  {
    SynchronizationDomain synchronizationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterSynchronization))
      {
        int msgID = LogMessages.MSGID_INVALID_PROVIDER;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      // From the domainDN retrieves the synchronization domain
      SynchronizationDomain sdomain =
        MultimasterSynchronization.findDomain(baseDN, null);
      if (sdomain == null)
      {
        int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
        String message = getMessage(msgID) + " " + baseDN;
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      if (synchronizationDomain != null)
      {
        // Should never happen
        int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      synchronizationDomain = sdomain;
    }
    return synchronizationDomain;
  }
  /**
   * Returns the backend associated to this domain.
   * @return The associated backend.
   */
  public Backend getBackend()
  {
    return backend;
  }
  /**
   * Returns a boolean indiciating if an import or export is currently
   * processed.
   * @return The status
   */
  public boolean ieRunning()
  {
    return (ieContext != null);
  }
  /*
   * <<Total Update
   */
  /**
   * Push the modifications contain the in given parameter has
   * a modification that would happen on a local server.
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,11 +112,16 @@
    /* Read the first 8 bytes containing the packet length */
    int length = 0;
    /* Let's start the stop-watch before waiting on read */
    /* for the heartbeat check to be operationnal        */
    lastReceiveTime = System.currentTimeMillis();
    while (length<8)
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      if (read == -1)
      {
        lastReceiveTime=0;
        throw new IOException("no more data");
      }
      else
@@ -135,8 +140,9 @@
      {
        length += input.read(buffer, length, totalLength - length);
      }
      lastReceiveTime = System.currentTimeMillis();
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return SynchronizationMessage.generateMsg(buffer);
    }
    catch (OutOfMemoryError e)
@@ -159,6 +165,10 @@
   */
  public long getLastReceiveTime()
  {
    if (lastReceiveTime==0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,6 +47,13 @@
  static final byte MSG_TYPE_CHANGELOG_START = 7;
  static final byte MSG_TYPE_WINDOW = 8;
  static final byte MSG_TYPE_HEARTBEAT = 9;
  static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
  static final byte MSG_TYPE_INITIALIZE_TARGET = 11;
  static final byte MSG_TYPE_ENTRY = 12;
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
  /**
   * Return the byte[] representation of this message.
@@ -60,6 +67,11 @@
   * MSG_TYPE_CHANGELOG_START
   * MSG_TYPE_WINDOW
   * MSG_TYPE_HEARTBEAT
   * MSG_TYPE_INITIALIZE
   * MSG_TYPE_INITIALIZE_TARGET
   * MSG_TYPE_ENTRY
   * MSG_TYPE_DONE
   * MSG_TYPE_ERROR
   *
   * @return the byte[] representation of this message.
   */
@@ -107,6 +119,21 @@
      case MSG_TYPE_HEARTBEAT:
        msg = new HeartbeatMessage(buffer);
      break;
      case MSG_TYPE_INITIALIZE_REQUEST:
        msg = new InitializeRequestMessage(buffer);
      break;
      case MSG_TYPE_INITIALIZE_TARGET:
        msg = new InitializeTargetMessage(buffer);
      break;
      case MSG_TYPE_ENTRY:
        msg = new EntryMessage(buffer);
      break;
      case MSG_TYPE_DONE:
        msg = new DoneMessage(buffer);
      break;
      case MSG_TYPE_ERROR:
        msg = new ErrorMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,6 +26,7 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -54,6 +55,8 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ByteStringFactory;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -158,7 +161,11 @@
        }
      }
      catch (Exception e)
      { }
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1);
      }
    }
    return broker;
  }
@@ -221,7 +228,8 @@
        }
      }
      catch (Exception e)
      { }
      {
      }
    }
    return broker;
  }
@@ -231,6 +239,10 @@
   */
  protected void cleanEntries()
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "SynchronizationTestCase/Cleaning entries" , 1);
    DeleteOperation op;
    // Delete entries
    try
@@ -238,6 +250,10 @@
      while (true)
      {
        DN dn = entryList.removeLast();
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "cleaning entry " + dn, 1);
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
@@ -264,8 +280,13 @@
    // WORKAROUND FOR BUG #639 - BEGIN -
    if (mms != null)
    {
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationTestCase/FinalizeSynchronization Provider" , 1);
      DirectoryServer.deregisterSynchronizationProvider(mms);
      mms.finalizeSynchronizationProvider();
      mms = null;
    }
    // WORKAROUND FOR BUG #639 - END -
@@ -303,17 +324,22 @@
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
    if (changeLogEntry!=null)
    {
      DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry.getDN());
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
    entryList.add(synchroServerEntry.getDN());
      entryList.add(changeLogEntry.getDN());
    }
    if (synchroServerEntry!=null)
    {
      // We also have a replicated suffix (synchronization domain)
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
      "Unable to add the synchronized suffix");
      entryList.add(synchroServerEntry.getDN());
    }
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
          + "ds-cfg-changelog-db-directory: changelogDb"+i;
        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
        changelogs[i] = new Changelog(changelogConfig);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,16 +26,18 @@
 */
package org.opends.server.synchronization.protocol;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
import java.util.zip.DataFormatException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -56,8 +58,8 @@
import org.opends.server.types.ObjectClass;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
 * Test the contructors, encoders and decoders of the synchronization
@@ -520,6 +522,104 @@
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
  @Test()
  public void EntryMessageTest() throws Exception
  {
    String taskInitFromS2 = new String(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks\n" +
        "objectclass: top\n" +
        "objectclass: ds-task\n" +
        "objectclass: ds-task-initialize\n" +
        "ds-task-class-name: org.opends.server.tasks.InitializeTask" +
        "ds-task-initialize-domain-dn: dc=example,dc=com" +
        "ds-task-initialize-source: 1");
    short sender = 1;
    short target = 2;
    byte[] entry = taskInitFromS2.getBytes();
    EntryMessage msg = new EntryMessage(sender, target, entry);
    EntryMessage newMsg = new EntryMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
  }
  /**
   * Test that InitializeRequestMessage encoding and decoding works
   */
  @Test()
  public void InitializeRequestMessageTest() throws Exception
  {
    short sender = 1;
    short target = 2;
    InitializeRequestMessage msg = new InitializeRequestMessage(
        DN.decode("dc=example"), sender, target);
    InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
  }
  /**
   * Test that InitializeTargetMessage encoding and decoding works
   */
  @Test()
  public void InitializeTargetMessageTest() throws Exception
  {
    short senderID = 1;
    short targetID = 2;
    short requestorID = 3;
    long entryCount = 4;
    DN baseDN = DN.decode("dc=example");
    InitializeTargetMessage msg = new InitializeTargetMessage(
        baseDN, senderID, targetID, requestorID, entryCount);
    InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
    assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
    assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
    assertEquals(senderID, newMsg.getsenderID());
    assertEquals(targetID, newMsg.getDestination());
    assertEquals(requestorID, newMsg.getRequestorID());
    assertEquals(entryCount, newMsg.getEntryCount());
    assertTrue(baseDN.equals(newMsg.getBaseDN())) ;
  }
  /**
   * Test that DoneMessage encoding and decoding works
   */
  @Test()
  public void DoneMessage() throws Exception
  {
    DoneMessage msg = new DoneMessage((short)1, (short)2);
    DoneMessage newMsg = new DoneMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
  }
  /**
   * Test that ErrorMessage encoding and decoding works
   */
  @Test()
  public void ErrorMessage() throws Exception
  {
    ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
    ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getMsgID(), newMsg.getMsgID());
    assertEquals(msg.getDetails(), newMsg.getDetails());
  }
  /**
   * Test PendingChange
   */
  private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)