mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

neil_a_wilson
29.12.2007 42b383614d9a4720f6f0c91fb770a389cfa41738
Back out the commit included in revision 1539 because it does not build
properly (it appears to reference classes that are not in the repository).
18 files modified
2079 ■■■■■ changed files
opends/resource/schema/02-config.ldif 33 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/config/ConfigConstants.java 56 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/TaskMessages.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java 374 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java 63 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/common/LogMessages.java 65 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java 1196 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java 27 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 50 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java 112 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -1045,10 +1045,6 @@
  NAME 'ds-cfg-heartbeat-interval'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306
  NAME 'ds-cfg-changelog-db-directory'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307
  NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' )
@@ -1107,22 +1103,6 @@
  NAME 'ds-cfg-case-sensitive-validation' 
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE 
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332
  NAME 'ds-task-initialize-domain-dn'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333
  NAME 'ds-task-initialize-replica-server-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334
  NAME 'ds-task-unprocessed-entry-count'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335
  NAME 'ds-task-processed-entry-count'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
  MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1445,8 +1425,7 @@
  'ds-cfg-synchronization-changelog-server-config' SUP top
  STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
  MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
  ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory )
  X-ORIGIN 'OpenDS Directory Server' )
  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
  SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
  X-ORIGIN 'OpenDS Directory Server' )
@@ -1553,14 +1532,4 @@
  SUP ds-cfg-password-validator STRUCTURAL
  MUST ( ds-cfg-maximum-consecutive-length $ ds-cfg-case-sensitive-validation )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.91
  NAME 'ds-task-initialize-from-remote-replica' SUP ds-task
  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.92
  NAME 'ds-task-initialize-remote-replica' SUP ds-task
  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
  X-ORIGIN 'OpenDS Directory Server' )
opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,62 +3665,6 @@
       NAME_PREFIX_TASK + "import-is-encrypted";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * initialize task definition.
   */
  public static final String OC_INITIALIZE_TASK =
    NAME_PREFIX_TASK + "initialize-from-remote-replica";
  /**
   * The name of the attribute in an initialize task definition that specifies
   * the base dn related to the synchonization domain to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN =
       NAME_PREFIX_TASK + "initialize-domain-dn";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the source in terms of source server from which to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_SOURCE =
       NAME_PREFIX_TASK + "initialize-replica-server-id";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * initialize target task definition.
   */
  public static final String OC_INITIALIZE_TARGET_TASK =
    NAME_PREFIX_TASK + "initialize-remote-replica";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the base dn related to the synchonization domain to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN =
       NAME_PREFIX_TASK + "initialize-domain-dn";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE =
       NAME_PREFIX_TASK + "initialize-replica-server-id";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_LEFT =
       NAME_PREFIX_TASK + "unprocessed-entry-count";
  /**
   * The name of the attribute in an initialize target task definition that
   * specifies the scope in terms of servers to initialize.
   */
  public static final String ATTR_TASK_INITIALIZE_DONE =
       NAME_PREFIX_TASK + "processed-entry-count";
  /**
   * The name of the objectclass that will be used for a Directory Server
opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,19 +211,7 @@
  public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17;
  /**
   * The message ID for the message that will be used when an invalid domain
   * base DN is provided as argument to the initialize target task.
   */
  public static final int  MSGID_TASK_INITIALIZE_TARGET_INVALID_DN =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18;
  /**
   * The message ID for the message that will be used when an invalid domain
   * base DN is provided as argument to the initialize task.
   */
  public static final int  MSGID_TASK_INITIALIZE_INVALID_DN =
       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19;
  /**
   * Associates a set of generic messages with the message IDs defined in this
@@ -291,11 +279,6 @@
    registerMessage(MSGID_TASK_LDIFEXPORT_INSUFFICIENT_PRIVILEGES,
                    "You do not have sufficient privileges to initiate an " +
                    "LDIF export.");
    registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN,
                    "Invalid DN provided with the Initialize Target task.");
    registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN,
                    "Invalid DN provided with the Initialize task.");
  }
}
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
  static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,31 +26,26 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import com.sleepycat.je.DatabaseException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import com.sleepycat.je.DatabaseException;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
 * This class define an in-memory cache that will be used to store
@@ -430,263 +425,148 @@
    }
  }
  /**
   * Retrieves the destination handlers for a routable message.
   * Send back an ack to the server that sent the change.
   *
   * @param msg The message to route.
   * @param senderHandler The handler of the server that published this message.
   * @return The list of destination handlers.
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   */
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
  {
    short serverId = changeNumber.getServerId();
    sendAck(changeNumber, isLDAPserver, serverId);
  }
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      // TODO Import from the "closest server" to be implemented
    }
    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
    {
      if (!senderHandler.isChangelogServer())
      {
        // Send to all changelogServers
        for (ServerHandler destinationHandler : changelogServers.values())
        {
          servers.add(destinationHandler);
        }
      }
      // Send to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
          continue;
        servers.add(destinationHandler);
      }
    }
  /**
   *
   * Send back an ack to a server that sent the change.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param serverId     The identifier of the server from which we
   *                     received the change..
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
                      short serverId)
  {
    ServerHandler handler;
    if (isLDAPserver)
      handler = connectedServers.get(serverId);
    else
    {
      // Destination is one server
      ServerHandler destinationHandler =
        connectedServers.get(msg.getDestination());
      if (destinationHandler != null)
      {
        servers.add(destinationHandler);
      }
      else
      {
        // the targeted server is NOT connected
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(changelogServers.values());
        }
      }
    }
      handler = changelogServers.get(serverId);
    return servers;
    // TODO : check for null handler and log error
    try
    {
      handler.sendAck(changeNumber);
    } catch (IOException e)
    {
      /*
       * An error happened trying the send back an ack to this server.
       * Log an error and close the connection to this server.
       */
      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
      String message = getMessage(msgID, this.toString())
                                  + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      handler.shutdown();
    }
  }
  /**
   * Process an InitializeRequestMessage.
   *
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
   * the message.
   * Shutdown this ChangelogCache.
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  public void shutdown()
  {
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
    if (servers.isEmpty())
    // Close session with other changelogs
    for (ServerHandler serverHandler : changelogServers.values())
    {
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
        }
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        }
      }
      return;
      serverHandler.shutdown();
    }
    for (ServerHandler targetHandler : servers)
    // Close session with other LDAP servers
    for (ServerHandler serverHandler : connectedServers.values())
    {
      try
      {
        targetHandler.send(msg);
      }
      catch(IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
      }
      serverHandler.shutdown();
    }
    // Shutdown the dbHandlers
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      {
        dbHandler.shutdown();
      }
      sourceDbHandlers.clear();
    }
  }
    /**
     * Send back an ack to the server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
  /**
   * Returns the ServerState describing the last change from this replica.
   *
   * @return The ServerState describing the last change from this replica.
   */
  public ServerState getDbServerState()
  {
    ServerState serverState = new ServerState();
    for (DbHandler db : sourceDbHandlers.values())
    {
      short serverId = changeNumber.getServerId();
      sendAck(changeNumber, isLDAPserver, serverId);
      serverState.update(db.getLastChange());
    }
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    return "ChangelogCache " + baseDn;
  }
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    {
      handler.checkWindow();
    }
    /**
     *
     * Send back an ack to a server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     * @param serverId     The identifier of the server from which we
     *                     received the change..
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
        short serverId)
    for (ServerHandler handler : connectedServers.values())
    {
      ServerHandler handler;
      if (isLDAPserver)
        handler = connectedServers.get(serverId);
      else
        handler = changelogServers.get(serverId);
      // TODO : check for null handler and log error
      try
      {
        handler.sendAck(changeNumber);
      } catch (IOException e)
      {
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        handler.shutdown();
      }
      handler.checkWindow();
    }
  }
    /**
     * Shutdown this ChangelogCache.
     */
    public void shutdown()
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    {
      // Close session with other changelogs
      for (ServerHandler serverHandler : changelogServers.values())
      {
        serverHandler.shutdown();
      }
      // Close session with other LDAP servers
      for (ServerHandler serverHandler : connectedServers.values())
      {
        serverHandler.shutdown();
      }
      // Shutdown the dbHandlers
      synchronized (sourceDbHandlers)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        {
          dbHandler.shutdown();
        }
        sourceDbHandlers.clear();
      }
    }
    /**
     * Returns the ServerState describing the last change from this replica.
     *
     * @return The ServerState describing the last change from this replica.
     */
    public ServerState getDbServerState()
    {
      ServerState serverState = new ServerState();
      for (DbHandler db : sourceDbHandlers.values())
      {
        serverState.update(db.getLastChange());
      }
      return serverState;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      return "ChangelogCache " + baseDn;
    }
    /**
     * Check if some server Handler should be removed from flow control state.
     * @throws IOException If an error happened.
     */
    public void checkAllSaturation() throws IOException
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.checkWindow();
      }
      for (ServerHandler handler : connectedServers.values())
      {
        handler.checkWindow();
      }
    }
    /**
     * Check if a server that was in flow control can now restart
     * sending updates.
     * @param sourceHandler The server that must be checked.
     * @return true if the server can restart sending changes.
     *         false if the server can't restart sending changes.
     */
    public boolean restartAfterSaturation(ServerHandler sourceHandler)
    {
      for (ServerHandler handler : changelogServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
      }
      for (ServerHandler handler : connectedServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
      }
      return true;
    }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    return true;
  }
}
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,8 +27,6 @@
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -46,18 +44,6 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -65,6 +51,17 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -111,7 +108,6 @@
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private short changelogId;
  /**
   * The time in milliseconds between heartbeats from the synchronization
@@ -159,7 +155,6 @@
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  {
    this.changelogId = changelogId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
@@ -1268,40 +1263,4 @@
  {
    return heartbeatInterval;
  }
  /**
   * Processes a routable message.
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
    changelogCache.process(msg, this);
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
    session.publish(msg);
  }
}
opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,11 +34,6 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
@@ -121,33 +116,6 @@
          WindowMessage windowMsg = (WindowMessage) msg;
          handler.updateWindow(windowMsg);
        }
        else if (msg instanceof InitializeRequestMessage)
        {
          InitializeRequestMessage initializeMsg =
            (InitializeRequestMessage) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof InitializeTargetMessage)
        {
          InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof EntryMessage)
        {
          EntryMessage entryMsg = (EntryMessage) msg;
          handler.process(entryMsg);
        }
        else if (msg instanceof DoneMessage)
        {
          DoneMessage doneMsg = (DoneMessage) msg;
          handler.process(doneMsg);
        }
        else if (msg instanceof ErrorMessage)
        {
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
      }
    } catch (IOException e)
    {
opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,61 +317,13 @@
    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
  /**
   * The message id for the description of the attribute used to configure
   * The message id for thedescription of the attribute used to configure
   * the purge delay of the Changelog Servers.
   */
  public static final int MSGID_PURGE_DELAY_ATTR =
    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
  /**
   * The message id for the error raised when export/import
   * is rejected due to an export/import already in progress.
   */
  public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44;
  /**
   * The message id for the error raised when import
   * is rejected due to an invalid source of data imported.
   */
  public static final int MSGID_INVALID_IMPORT_SOURCE =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45;
  /**
   * The message id for the error raised when export
   * is rejected due to an invalid target to export datas.
   */
  public static final int MSGID_INVALID_EXPORT_TARGET =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46;
  /**
   * The message id for the error raised when import/export message
   * cannot be routed to an up-and-running target in the domain.
   */
  public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47;
  /**
   * The message ID for the message that will be used when no domain
   * can be found matching the provided domain base DN.
   */
  public static final int  MSGID_NO_MATCHING_DOMAIN =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48;
  /**
   * The message ID for the message that will be used when no domain
   * can be found matching the provided domain base DN.
   */
  public static final int  MSGID_MULTIPLE_MATCHING_DOMAIN
       = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49;
  /**
   * The message ID for the message that will be used when the domain
   * belongs to a provider class that does not allow the export.
   */
  public static final int  MSGID_INVALID_PROVIDER =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50;
  /**
   * Register the messages from this class in the core server.
@@ -497,20 +449,5 @@
        " restored because changelog servers would not be able to refresh" +
        " LDAP servers with older versions of the data. A zero value" +
        " can be used to specify an infinite delay (or never purge).");
    MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
        "The current request is rejected due to an import or an export" +
        " already in progress for the same data.");
    MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE,
        "Invalid source for the import.");
    MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET,
        "Invalid target for the export.");
    MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
        "No reachable peer in the domain.");
    MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN,
        "No domain matches the base DN provided.");
    MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN,
        "Multiple domains match the base DN provided.");
    MessageHandler.registerMessage(MSGID_INVALID_PROVIDER,
        "The provider class does not allow the operation requested.");
  }
}
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,8 +27,6 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -347,10 +345,6 @@
          {
            if (session != null)
            {
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  "Broker : connect closing session" , 1);
              session.close();
              session = null;
            }
@@ -504,7 +498,6 @@
      try
      {
        SynchronizationMessage msg = session.receive();
        if (msg instanceof WindowMessage)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
@@ -551,11 +544,6 @@
    connected = false;
    try
    {
      if (debugEnabled())
      {
        debugInfo("ChangelogBroker Stop Closing session");
      }
      session.close();
    } catch (IOException e)
    {}
@@ -694,12 +682,4 @@
  {
    return numLostConnections;
  }
  private void log(String message)
  {
    int    msgID   = MSGID_UNKNOWN_TYPE;
    logError(ErrorLogCategory.SYNCHRONIZATION,
           ErrorLogSeverity.SEVERE_ERROR,
           message, msgID);
  }
}
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,13 +27,11 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import java.io.IOException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import java.io.IOException;
/**
 * This class implements a thread to monitor heartbeat messages from the
@@ -105,9 +103,6 @@
        long lastReceiveTime = session.getLastReceiveTime();
        if (now > lastReceiveTime + 2 * heartbeatInterval)
        {
          debugInfo("Heartbeat monitor is closing the broker session " +
          "because it could not detect a heartbeat.");
          // Heartbeat is well overdue so the server is assumed to be dead.
          if (debugEnabled())
          {
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@
   *             Can be null is the request has no associated operation.
   * @return     The Synchronization domain for this DN.
   */
  public static SynchronizationDomain findDomain(DN dn, Operation op)
  private static SynchronizationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special synchronization code on Operation that are
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,8 +233,7 @@
    {
      int msgID = MSGID_ERROR_UPDATING_RUV;
      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
          op.toString(), op.getErrorMessage(), baseDn.toString(),
          Thread.currentThread().getStackTrace());
          op.toString(), op.getErrorMessage(), baseDn.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,59 +26,47 @@
 */
package org.opends.server.synchronization.plugin;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.ConfigMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_ABBR;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_FULL;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -89,30 +77,19 @@
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -160,96 +137,8 @@
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  short serverId;
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   */
  private class IEContext
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    SynchroLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMessage.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry left to be processed
    long entryLeftCount = 0;
    // The exception raised when any
    DirectoryException exception = null;
    /**
     * Initializes the counters of the task with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initTaskCounters(long count)
    {
      entryCount = count;
      entryLeftCount = count;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
      }
    }
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     */
    public void updateTaskCounters()
    {
      entryLeftCount--;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
        }
      }
    }
    /**
     * Update the state of the task.
     */
    protected TaskState updateTaskCompletionState()
    {
      if (exception == null)
        return TaskState.COMPLETED_SUCCESSFULLY;
      else
        return TaskState.STOPPED_BY_ERROR;
    }
  }
  // The context related to an import or export being processed
  // Null when none is being processed.
  private IEContext ieContext = null;
  // The backend informations necessary to make an import or export.
  private Backend backend;
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private short serverId;
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
@@ -271,7 +160,6 @@
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -317,6 +205,8 @@
    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
  }
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   *
@@ -327,7 +217,6 @@
  public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
  {
    super("Synchronization flush");
    /*
     * read the centralized changelog server configuration
     * this is a multivalued attribute
@@ -508,10 +397,6 @@
        if (!receiveStatus)
          broker.suspendReceive();
      }
      // Retrieves the related backend and its config entry
      retrievesBackendInfos(baseDN);
    } catch (Exception e)
    {
     /* TODO should mark that changelog service is
@@ -918,9 +803,9 @@
  }
  /**
   * Receives an update message from the changelog.
   * Receive an update message from the changelog.
   * also responsible for updating the list of pending changes
   * @return the received message - null if none
   * @return the received message
   */
  public UpdateMessage receive()
  {
@@ -938,7 +823,7 @@
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
@@ -949,56 +834,6 @@
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
            }
            catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
            }
            catch(DirectoryException de)
            {
              // Return an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  changelog did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
@@ -1041,9 +876,7 @@
  public void receiveAck(AckMessage ack)
  {
    UpdateMessage update;
    ChangeNumber changeNumber;
    changeNumber = ack.getChangeNumber();
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    {
@@ -1272,7 +1105,7 @@
        synchronized (this)
        {
          this.wait(1000);
          if (!disabled && !stateSavingDisabled )
          if (!disabled )
          {
            // save the RUV
            state.save();
@@ -1318,8 +1151,6 @@
      this.notify();
    }
    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
    // stop the ChangelogBroker
    broker.stop();
  }
@@ -2026,7 +1857,6 @@
    return broker.getNumLostConnections();
  }
  /**
   * Check if the domain solve conflicts.
   *
@@ -2103,988 +1933,6 @@
    // Nothing is needed at the moment
  }
  /*
   * Total Update >>
   */
  /**
   * Receives bytes related to an entry in the context of an import to
   * initialize the domain (called by SynchronizationDomainLDIFInputStream).
   *
   * @return The bytes. Null when the Done or Err message has been received
   */
  public byte[] receiveEntryBytes()
  {
    SynchronizationMessage msg;
    while (true)
    {
      try
      {
        msg = broker.receive();
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        log("receiveEntryBytes: received " + msg);
        if (msg instanceof EntryMessage)
        {
          // FIXME
          EntryMessage entryMsg = (EntryMessage)msg;
          byte[] entryBytes = entryMsg.getEntryBytes().clone();
          ieContext.updateTaskCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          return null;
        }
        else if (msg instanceof ErrorMessage)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ieContext.exception = new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails() , errorMsg.getMsgID());
          return null;
        }
        else
        {
          // Other messages received during an import are trashed
        }
      }
      catch(Exception e)
      {
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            "received an unexpected message type" , 1, e);
      }
      return null;
    }
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * @param errorMsg The error message received.
   */
  protected void abandonImportExport(ErrorMessage errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
          errorMsg.getDetails() , errorMsg.getMsgID());
      if (ieContext.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
        ieContext = null;
      }
    }
  }
  /**
   * Clears all the entries from the JE backend determined by the
   * be id passed into the method.
   *
   * @param  createBaseEntry  Indicate whether to automatically create the base
   *                          entry and add it to the backend.
   * @param beID  The be id to clear.
   * @param dn   The suffix of the backend to create if the the createBaseEntry
   *             boolean is true.
   * @throws Exception  If an unexpected problem occurs.
   */
  public static void clearJEBackend(boolean createBaseEntry, String beID,
      String dn) throws Exception
  {
    BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
    DN[] baseDNs = backend.getBaseDNs();
    // FIXME Should getConfigEntry be part of TaskUtils ?
    ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
    // FIXME Should setBackendEnabled be part of TaskUtils ?
    TaskUtils.setBackendEnabled(configEntry, false);
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
      {
        throw new RuntimeException(failureReason.toString());
      }
      try
      {
        backend.clearBackend(configEntry, baseDNs);
      }
      finally
      {
        LockFileManager.releaseLock(lockFile, failureReason);
      }
    }
    finally
    {
      TaskUtils.setBackendEnabled(configEntry, true);
    }
    if (createBaseEntry)
    {
      DN baseDN = DN.decode(dn);
      Entry e = createEntry(baseDN);
      backend = (BackendImpl)DirectoryServer.getBackend(beID);
      backend.addEntry(e, null);
    }
  }
  /**
   * Log debug message.
   * @param message The message to log.
   */
  private void log(String message)
  {
    if (debugEnabled())
    {
      debugInfo("DebugInfo" + message);
      int    msgID   = MSGID_UNKNOWN_TYPE;
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationDomain/ " + message, msgID);
    }
  }
  /**
   * Export the entries.
   * @throws DirectoryException when an error occured
   */
  protected void exportBackend() throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            String.valueOf(failureReason));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message + " " + stackTraceToSingleLineString(e), msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    //  Launch the export.
    try
    {
      DN[] baseDNs = {this.baseDN};
      backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, de.getErrorMessage());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, stackTraceToSingleLineString(e));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    finally
    {
      //  Clean up after the export by closing the export config.
      exportConfig.close();
      //  Release the shared lock on the backend.
      try
      {
        String lockFile = LockFileManager.getBackendLockFileName(backend);
        StringBuilder failureReason = new StringBuilder();
        if (! LockFileManager.releaseLock(lockFile, failureReason))
        {
          int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
          String message = getMessage(msgID, backend.getBackendID(),
              String.valueOf(failureReason));
          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
              message, msgID);
          throw new DirectoryException(
              ResultCode.OTHER, message, msgID, null);
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            stackTraceToSingleLineString(e));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
  }
  /**
   * Retrieves the backend object related to the domain and the backend's
   * config entry. They will be used for import and export.
   * TODO This should be in a shared package rather than here.
   *
   * @param baseDN The baseDN to retrieve the backend
   * @throws DirectoryException when an error occired
   */
  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
  {
    ArrayList<Backend>     backendList = new ArrayList<Backend>();
    ArrayList<ConfigEntry> entryList   = new ArrayList<ConfigEntry>();
    ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
    Backend backend = null;
    ConfigEntry backendConfigEntry = null;
    List<DN> branches = new ArrayList<DN>(0);
    // Retrieves the backend related to this domain
    Backend domainBackend = DirectoryServer.getBackend(baseDN);
    if (domainBackend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    // Retrieves its config entry and its DNs
    int code = getBackends(backendList, entryList, dnList);
    if (code != 0)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    int numBackends = backendList.size();
    for (int i=0; i < numBackends; i++)
    {
      Backend b = backendList.get(i);
      if (domainBackend.getBackendID() != b.getBackendID())
      {
        continue;
      }
      if (backend == null)
      {
        backend = domainBackend;
        backendConfigEntry = entryList.get(i).duplicate();
        branches = dnList.get(i);
      }
      else
      {
        int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
        String message = getMessage(msgID, domainBackend.getBackendID());
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    if (backend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
      String message = getMessage(msgID, domainBackend.getBackendID());
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    else if (! backend.supportsLDIFExport())
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_IMPORT;
      String message = getMessage(msgID, 0); // FIXME
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    this.backend = backend;
    this.backendConfigEntry = backendConfigEntry;
    this.branches = branches;
  }
  /**
   * Sends lDIFEntry entry lines to the export target currently set.
   *
   * @param lDIFEntry The lines for the LDIF entry.
   * @throws IOException when an error occured.
   */
  public void sendEntryLines(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // we just let down the export.
    if (ieContext.exception != null)
    {
      IOException ioe = new IOException(ieContext.exception.getMessage());
      ieContext = null;
      throw ioe;
    }
    // new entry then send the current one
    EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    ieContext.updateTaskCounters();
  }
  /**
   * Retrieves information about the backends defined in the Directory Server
   * configuration.
   *
   * @param  backendList  A list into which instantiated (but not initialized)
   *                      backend instances will be placed.
   * @param  entryList    A list into which the config entries associated with
   *                      the backends will be placed.
   * @param  dnList       A list into which the set of base DNs for each backend
   *                      will be placed.
   */
  private static int getBackends(ArrayList<Backend> backendList,
                                 ArrayList<ConfigEntry> entryList,
                                 ArrayList<List<DN>> dnList)
  throws DirectoryException
  {
    //  Get the base entry for all backend configuration.
    DN backendBaseDN = null;
    try
    {
      backendBaseDN = DN.decode(DN_BACKEND_BASE);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    ConfigEntry baseEntry = null;
    try
    {
      baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
    }
    catch (ConfigException ce)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    //  Iterate through the immediate children, attempting to parse them as
    //  backends.
    for (ConfigEntry configEntry : baseEntry.getChildren().values())
    {
      // Get the backend ID attribute from the entry.  If there isn't one, then
      // skip the entry.
      String backendID = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
        StringConfigAttribute idStub =
          new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
              true, false, true);
        StringConfigAttribute idAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
        if (idAttr == null)
        {
          continue;
        }
        else
        {
          backendID = idAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      //    Get the backend class name attribute from the entry.  If there isn't
      //    one, then just skip the entry.
      String backendClassName = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
        StringConfigAttribute classStub =
          new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
              true, false, false);
        StringConfigAttribute classAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
        if (classAttr == null)
        {
          continue;
        }
        else
        {
          backendClassName = classAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Class backendClass = null;
      try
      {
        backendClass = Class.forName(backendClassName);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Backend backend = null;
      try
      {
        backend = (Backend) backendClass.newInstance();
        backend.setBackendID(backendID);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      // Get the base DN attribute from the entry.  If there isn't one, then
      // just skip this entry.
      List<DN> baseDNs = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
        DNConfigAttribute baseDNStub =
          new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
              true, true, true);
        DNConfigAttribute baseDNAttr =
          (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
        if (baseDNAttr == null)
        {
          msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND;
          String message = getMessage(msgID,
              String.valueOf(configEntry.getDN()));
          throw new DirectoryException(
              DirectoryServer.getServerErrorResultCode(), message,msgID, null);
        }
        else
        {
          baseDNs = baseDNAttr.activeValues();
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      backendList.add(backend);
      entryList.add(configEntry);
      dnList.add(baseDNs);
    }
    return 0;
      }
  /**
   * Initializes this domain from another source server.
   *
   * @param source The source from which to initialize
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initialize(short source, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.initializeTask = initTask;
    InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
        baseDN, serverId, source);
    // Publish Init request msg
    broker.publish(initializeMsg);
    // .. we expect to receive entries or err after that
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representaing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    short  source = 0;
    Throwable cause = null;
    try
    {
      source = Integer.decode(sourceString).shortValue();
      if (source >= -1)
      {
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        log("Source decoded for import:" + source);
        return source;
      }
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param targetString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeTarget(String targetString)
  throws DirectoryException
  {
    short  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMessage.ALL_SERVERS;
    }
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString).shortValue();
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
      }
      return target;
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  private synchronized void acquireIEContext()
  throws DirectoryException
  {
    if (ieContext != null)
    {
      // Rejects 2 simultaneous exports
      int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
      String message = getMessage(msgID);
      throw new DirectoryException(ResultCode.OTHER,
          message, msgID);
    }
    ieContext = new IEContext();
  }
  private synchronized void releaseIEContext()
  {
    ieContext = null;
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument.
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, Task initTask)
  throws DirectoryException
  {
    initializeTarget(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization has been
   * initiated by another server than this one.
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.exportTarget = target;
    ieContext.initializeTask = initTask;
    ieContext.initTaskCounters(backend.getEntryCount());
    // Send start message
    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
        baseDN, serverId, ieContext.exportTarget, requestorID,
        ieContext.entryLeftCount);
    log("SD : publishes " + initializeMessage +
        " for #entries=" + ieContext.entryCount);
    broker.publish(initializeMessage);
    // make an export and send entries
    exportBackend();
    // Successfull termnation
    DoneMessage doneMsg = new DoneMessage(serverId,
      initializeMessage.getDestination());
    broker.publish(doneMsg);
    if (ieContext != null)
    {
      ieContext.updateTaskCompletionState();
      ieContext = null;
    }
  }
  /**
   * Process backend before import.
   * @param backend The backend.
   * @param backendConfigEntry The config entry of the backend.
   * @throws Exception
   */
  private void preBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws Exception
  {
    // Stop saving state
    stateSavingDisabled = true;
    // Clear the backend
    clearJEBackend(false,backend.getBackendID(),null);
    // FIXME setBackendEnabled should be part of TaskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, false);
    // Acquire an exclusive lock for the backend.
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      throw new DirectoryException(ResultCode.OTHER, message, msgID);
    }
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void importBackend(InitializeTargetMessage initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    try
    {
      log("startImport");
      if (initializeMessage.getRequestorID() == serverId)
      {
        // The import responds to a request we did so the IEContext
        // is already acquired
      }
      else
      {
        acquireIEContext();
      }
      ieContext.importSource = initializeMessage.getsenderID();
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initTaskCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend, this.backendConfigEntry);
      DN[] baseDNs = {baseDN};
      ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
      importConfig =
        new LDIFImportConfig(ieContext.ldifImportInputStream);
      importConfig.setIncludeBranches(this.branches);
      // TODO How to deal with rejected entries during the import
      // importConfig.writeRejectedEntries("rejectedImport",
      // ExistingFileBehavior.OVERWRITE);
      // Process import
      this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
      stateSavingDisabled = false;
      // Re-exchange state with SS
      broker.stop();
      broker.start(changelogServers);
    }
    catch(Exception e)
    {
      throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
          2);// FIXME
    }
    finally
    {
      // Cleanup
      importConfig.close();
      // Re-enable backend
      closeBackendImport(this.backend, this.backendConfigEntry);
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      {
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
      }
      releaseIEContext();
      log("End importBackend");
    }
    // Success
  }
  /**
   * Make post import operations.
   * @param backend The backend implied in the import.
   * @param backendConfigEntry The config entry of the backend.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void closeBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws DirectoryException
  {
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    // Release lock
    if (!LockFileManager.releaseLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      new DirectoryException(ResultCode.OTHER, message, msgID);
    }
    // FIXME setBackendEnabled should be part taskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, true);
  }
  /**
   * Retrieves a synchronization domain based on the baseDN.
   *
   * @param baseDN The baseDN of the domain to retrieve
   * @return The domain retrieved
   * @throws DirectoryException When an error occured.
   */
  public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
  throws DirectoryException
  {
    SynchronizationDomain synchronizationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterSynchronization))
      {
        int msgID = LogMessages.MSGID_INVALID_PROVIDER;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      // From the domainDN retrieves the synchronization domain
      SynchronizationDomain sdomain =
        MultimasterSynchronization.findDomain(baseDN, null);
      if (sdomain == null)
      {
        int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
        String message = getMessage(msgID) + " " + baseDN;
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      if (synchronizationDomain != null)
      {
        // Should never happen
        int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      synchronizationDomain = sdomain;
    }
    return synchronizationDomain;
  }
  /**
   * Returns the backend associated to this domain.
   * @return The associated backend.
   */
  public Backend getBackend()
  {
    return backend;
  }
  /**
   * Returns a boolean indiciating if an import or export is currently
   * processed.
   * @return The status
   */
  public boolean ieRunning()
  {
    return (ieContext != null);
  }
  /*
   * <<Total Update
   */
  /**
   * Push the modifications contain the in given parameter has
   * a modification that would happen on a local server.
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,16 +112,11 @@
    /* Read the first 8 bytes containing the packet length */
    int length = 0;
    /* Let's start the stop-watch before waiting on read */
    /* for the heartbeat check to be operationnal        */
    lastReceiveTime = System.currentTimeMillis();
    while (length<8)
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      if (read == -1)
      {
        lastReceiveTime=0;
        throw new IOException("no more data");
      }
      else
@@ -140,9 +135,8 @@
      {
        length += input.read(buffer, length, totalLength - length);
      }
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      lastReceiveTime = System.currentTimeMillis();
      return SynchronizationMessage.generateMsg(buffer);
    }
    catch (OutOfMemoryError e)
@@ -165,10 +159,6 @@
   */
  public long getLastReceiveTime()
  {
    if (lastReceiveTime==0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,13 +47,6 @@
  static final byte MSG_TYPE_CHANGELOG_START = 7;
  static final byte MSG_TYPE_WINDOW = 8;
  static final byte MSG_TYPE_HEARTBEAT = 9;
  static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
  static final byte MSG_TYPE_INITIALIZE_TARGET = 11;
  static final byte MSG_TYPE_ENTRY = 12;
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
  /**
   * Return the byte[] representation of this message.
@@ -67,11 +60,6 @@
   * MSG_TYPE_CHANGELOG_START
   * MSG_TYPE_WINDOW
   * MSG_TYPE_HEARTBEAT
   * MSG_TYPE_INITIALIZE
   * MSG_TYPE_INITIALIZE_TARGET
   * MSG_TYPE_ENTRY
   * MSG_TYPE_DONE
   * MSG_TYPE_ERROR
   *
   * @return the byte[] representation of this message.
   */
@@ -119,21 +107,6 @@
      case MSG_TYPE_HEARTBEAT:
        msg = new HeartbeatMessage(buffer);
      break;
      case MSG_TYPE_INITIALIZE_REQUEST:
        msg = new InitializeRequestMessage(buffer);
      break;
      case MSG_TYPE_INITIALIZE_TARGET:
        msg = new InitializeTargetMessage(buffer);
      break;
      case MSG_TYPE_ENTRY:
        msg = new EntryMessage(buffer);
      break;
      case MSG_TYPE_DONE:
        msg = new DoneMessage(buffer);
      break;
      case MSG_TYPE_ERROR:
        msg = new ErrorMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -55,8 +54,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ByteStringFactory;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -161,11 +158,7 @@
        }
      }
      catch (Exception e)
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1);
      }
      { }
    }
    return broker;
  }
@@ -228,8 +221,7 @@
        }
      }
      catch (Exception e)
      {
      }
      { }
    }
    return broker;
  }
@@ -239,10 +231,6 @@
   */
  protected void cleanEntries()
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "SynchronizationTestCase/Cleaning entries" , 1);
    DeleteOperation op;
    // Delete entries
    try
@@ -250,10 +238,6 @@
      while (true)
      {
        DN dn = entryList.removeLast();
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "cleaning entry " + dn, 1);
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
@@ -280,13 +264,8 @@
    // WORKAROUND FOR BUG #639 - BEGIN -
    if (mms != null)
    {
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationTestCase/FinalizeSynchronization Provider" , 1);
      DirectoryServer.deregisterSynchronizationProvider(mms);
      mms.finalizeSynchronizationProvider();
      mms = null;
    }
    // WORKAROUND FOR BUG #639 - END -
@@ -324,22 +303,17 @@
    //
    // Add the changelog server
    if (changeLogEntry!=null)
    {
      DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
      entryList.add(changeLogEntry.getDN());
    }
    if (synchroServerEntry!=null)
    {
      // We also have a replicated suffix (synchronization domain)
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
      "Unable to add the synchronized suffix");
      entryList.add(synchroServerEntry.getDN());
    }
    entryList.add(changeLogEntry.getDN());
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
    entryList.add(synchroServerEntry.getDN());
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
          + "ds-cfg-changelog-db-directory: changelogDb"+i;
          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
        changelogs[i] = new Changelog(changelogConfig);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,18 +26,16 @@
 */
package org.opends.server.synchronization.protocol;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
import java.util.zip.DataFormatException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -58,8 +56,8 @@
import org.opends.server.types.ObjectClass;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.synchronization.protocol.OperationContext.*;
/**
 * Test the contructors, encoders and decoders of the synchronization
@@ -522,104 +520,6 @@
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
  @Test()
  public void EntryMessageTest() throws Exception
  {
    String taskInitFromS2 = new String(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks\n" +
        "objectclass: top\n" +
        "objectclass: ds-task\n" +
        "objectclass: ds-task-initialize\n" +
        "ds-task-class-name: org.opends.server.tasks.InitializeTask" +
        "ds-task-initialize-domain-dn: dc=example,dc=com" +
        "ds-task-initialize-source: 1");
    short sender = 1;
    short target = 2;
    byte[] entry = taskInitFromS2.getBytes();
    EntryMessage msg = new EntryMessage(sender, target, entry);
    EntryMessage newMsg = new EntryMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
  }
  /**
   * Test that InitializeRequestMessage encoding and decoding works
   */
  @Test()
  public void InitializeRequestMessageTest() throws Exception
  {
    short sender = 1;
    short target = 2;
    InitializeRequestMessage msg = new InitializeRequestMessage(
        DN.decode("dc=example"), sender, target);
    InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
  }
  /**
   * Test that InitializeTargetMessage encoding and decoding works
   */
  @Test()
  public void InitializeTargetMessageTest() throws Exception
  {
    short senderID = 1;
    short targetID = 2;
    short requestorID = 3;
    long entryCount = 4;
    DN baseDN = DN.decode("dc=example");
    InitializeTargetMessage msg = new InitializeTargetMessage(
        baseDN, senderID, targetID, requestorID, entryCount);
    InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
    assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
    assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
    assertEquals(senderID, newMsg.getsenderID());
    assertEquals(targetID, newMsg.getDestination());
    assertEquals(requestorID, newMsg.getRequestorID());
    assertEquals(entryCount, newMsg.getEntryCount());
    assertTrue(baseDN.equals(newMsg.getBaseDN())) ;
  }
  /**
   * Test that DoneMessage encoding and decoding works
   */
  @Test()
  public void DoneMessage() throws Exception
  {
    DoneMessage msg = new DoneMessage((short)1, (short)2);
    DoneMessage newMsg = new DoneMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
  }
  /**
   * Test that ErrorMessage encoding and decoding works
   */
  @Test()
  public void ErrorMessage() throws Exception
  {
    ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
    ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getMsgID(), newMsg.getMsgID());
    assertEquals(msg.getDetails(), newMsg.getDetails());
  }
  /**
   * Test PendingChange
   */
  private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)