From 40cef7d36084fbe86d34cfa497628d8972c4c9e7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 29 Mar 2007 17:53:41 +0000
Subject: [PATCH] 

---
 opends/resource/schema/02-config.ldif                                                                            |   33 
 opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java                            | 1194 +++++++++++++++++++++++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java |  112 +++
 opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                                     |    2 
 opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java                                 |    9 
 opends/src/server/org/opends/server/synchronization/common/LogMessages.java                                      |   65 +
 opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java                            |    3 
 opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java                                  |   32 
 opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java                                |  366 ++++++---
 opends/src/server/org/opends/server/messages/TaskMessages.java                                                   |   17 
 opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java                                  |   20 
 opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java                       |    2 
 opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java                                  |   14 
 opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java                         |   27 
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java         |    2 
 opends/src/server/org/opends/server/config/ConfigConstants.java                                                  |   56 +
 opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java                                 |   63 +
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java         |   50 +
 18 files changed, 1,884 insertions(+), 183 deletions(-)

diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index b5791ca..b60575c 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -1045,6 +1045,10 @@
   NAME 'ds-cfg-heartbeat-interval'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306
+  NAME 'ds-cfg-changelog-db-directory'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307
   NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
   USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' )
@@ -1103,6 +1107,22 @@
   NAME 'ds-cfg-case-sensitive-validation' 
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE 
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332
+  NAME 'ds-task-initialize-domain-dn'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333
+  NAME 'ds-task-initialize-replica-server-id'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334
+  NAME 'ds-task-unprocessed-entry-count'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335
+  NAME 'ds-task-processed-entry-count'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
   MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1425,7 +1445,8 @@
   'ds-cfg-synchronization-changelog-server-config' SUP top
   STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
   MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
-  ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
+  ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory )
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
   SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
   X-ORIGIN 'OpenDS Directory Server' )
@@ -1532,4 +1553,14 @@
   SUP ds-cfg-password-validator STRUCTURAL
   MUST ( ds-cfg-maximum-consecutive-length $ ds-cfg-case-sensitive-validation )
   X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.91
+  NAME 'ds-task-initialize-from-remote-replica' SUP ds-task
+  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
+  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
+  X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.92
+  NAME 'ds-task-initialize-remote-replica' SUP ds-task
+  MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
+  MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
+  X-ORIGIN 'OpenDS Directory Server' )  
 
diff --git a/opends/src/server/org/opends/server/config/ConfigConstants.java b/opends/src/server/org/opends/server/config/ConfigConstants.java
index 8eb3ad5..5a666c2 100644
--- a/opends/src/server/org/opends/server/config/ConfigConstants.java
+++ b/opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,6 +3665,62 @@
        NAME_PREFIX_TASK + "import-is-encrypted";
 
 
+  /**
+   * The name of the objectclass that will be used for a Directory Server
+   * initialize task definition.
+   */
+  public static final String OC_INITIALIZE_TASK =
+    NAME_PREFIX_TASK + "initialize-from-remote-replica";
+
+  /**
+   * The name of the attribute in an initialize task definition that specifies
+   * the base dn related to the synchonization domain to initialize.
+   */
+  public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN =
+       NAME_PREFIX_TASK + "initialize-domain-dn";
+
+  /**
+   * The name of the attribute in an initialize target task definition that
+   * specifies the source in terms of source server from which to initialize.
+   */
+  public static final String ATTR_TASK_INITIALIZE_SOURCE =
+       NAME_PREFIX_TASK + "initialize-replica-server-id";
+
+  /**
+   * The name of the objectclass that will be used for a Directory Server
+   * initialize target task definition.
+   */
+  public static final String OC_INITIALIZE_TARGET_TASK =
+    NAME_PREFIX_TASK + "initialize-remote-replica";
+
+  /**
+   * The name of the attribute in an initialize target task definition that
+   * specifies the base dn related to the synchonization domain to initialize.
+   */
+  public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN =
+       NAME_PREFIX_TASK + "initialize-domain-dn";
+
+  /**
+   * The name of the attribute in an initialize target task definition that
+   * specifies the scope in terms of servers to initialize.
+   */
+  public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE =
+       NAME_PREFIX_TASK + "initialize-replica-server-id";
+
+  /**
+   * The name of the attribute in an initialize target task definition that
+   * specifies the scope in terms of servers to initialize.
+   */
+  public static final String ATTR_TASK_INITIALIZE_LEFT =
+       NAME_PREFIX_TASK + "unprocessed-entry-count";
+
+  /**
+   * The name of the attribute in an initialize target task definition that
+   * specifies the scope in terms of servers to initialize.
+   */
+  public static final String ATTR_TASK_INITIALIZE_DONE =
+       NAME_PREFIX_TASK + "processed-entry-count";
+
 
   /**
    * The name of the objectclass that will be used for a Directory Server
diff --git a/opends/src/server/org/opends/server/messages/TaskMessages.java b/opends/src/server/org/opends/server/messages/TaskMessages.java
index 16843e3..7349988 100644
--- a/opends/src/server/org/opends/server/messages/TaskMessages.java
+++ b/opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,7 +211,19 @@
   public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER =
        CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17;
 
+  /**
+   * The message ID for the message that will be used when an invalid domain
+   * base DN is provided as argument to the initialize target task.
+   */
+  public static final int  MSGID_TASK_INITIALIZE_TARGET_INVALID_DN =
+       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18;
 
+  /**
+   * The message ID for the message that will be used when an invalid domain
+   * base DN is provided as argument to the initialize task.
+   */
+  public static final int  MSGID_TASK_INITIALIZE_INVALID_DN =
+       CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19;
 
   /**
    * Associates a set of generic messages with the message IDs defined in this
@@ -279,6 +291,11 @@
     registerMessage(MSGID_TASK_LDIFEXPORT_INSUFFICIENT_PRIVILEGES,
                     "You do not have sufficient privileges to initiate an " +
                     "LDIF export.");
+
+    registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN,
+                    "Invalid DN provided with the Initialize Target task.");
+    registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN,
+                    "Invalid DN provided with the Initialize task.");
   }
 }
 
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 64bf887..531569c 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@
   static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
   static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
   static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
-  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
+  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
   static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
 
 
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index d7e0e2a..51f099e 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
  */
 package org.opends.server.synchronization.changelog;
 
+import static org.opends.server.loggers.Error.logError;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.loggers.Error.logError;
-
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
-import com.sleepycat.je.DatabaseException;
-import org.opends.server.types.DN;
-import org.opends.server.types.ErrorLogCategory;
-import org.opends.server.types.ErrorLogSeverity;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
+import com.sleepycat.je.DatabaseException;
+
 /**
  * This class define an in-memory cache that will be used to store
  * the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
     }
   }
 
-
   /**
-   * Send back an ack to the server that sent the change.
+   * Retrieves the destination handlers for a routable message.
    *
-   * @param changeNumber The ChangeNumber of the change that must be acked.
-   * @param isLDAPserver This boolean indicates if the server that sent the
-   *                     change was an LDAP server or a Changelog server.
+   * @param msg The message to route.
+   * @param senderHandler The handler of the server that published this message.
+   * @return The list of destination handlers.
    */
-  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
+      ServerHandler senderHandler)
   {
-    short serverId = changeNumber.getServerId();
-    sendAck(changeNumber, isLDAPserver, serverId);
-  }
 
-  /**
-   *
-   * Send back an ack to a server that sent the change.
-   *
-   * @param changeNumber The ChangeNumber of the change that must be acked.
-   * @param isLDAPserver This boolean indicates if the server that sent the
-   *                     change was an LDAP server or a Changelog server.
-   * @param serverId     The identifier of the server from which we
-   *                     received the change..
-   */
-  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
-                      short serverId)
-  {
-    ServerHandler handler;
-    if (isLDAPserver)
-      handler = connectedServers.get(serverId);
-    else
-      handler = changelogServers.get(serverId);
+    List<ServerHandler> servers =
+      new ArrayList<ServerHandler>();
 
-    // TODO : check for null handler and log error
-    try
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.SEVERE_ERROR,
+        "getDestinationServers"
+        + " msgDest:" + msg.getDestination() , 1);
+
+    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
     {
-      handler.sendAck(changeNumber);
-    } catch (IOException e)
-    {
-      /*
-       * An error happened trying the send back an ack to this server.
-       * Log an error and close the connection to this server.
-       */
-      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
-      String message = getMessage(msgID, this.toString())
-                                  + stackTraceToSingleLineString(e);
-      logError(ErrorLogCategory.SYNCHRONIZATION,
-               ErrorLogSeverity.SEVERE_ERROR,
-               message, msgID);
-      handler.shutdown();
+      // TODO Import from the "closest server" to be implemented
     }
-  }
-
-  /**
-   * Shutdown this ChangelogCache.
-   */
-  public void shutdown()
-  {
-    // Close session with other changelogs
-    for (ServerHandler serverHandler : changelogServers.values())
+    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
     {
-      serverHandler.shutdown();
-    }
-
-    // Close session with other LDAP servers
-    for (ServerHandler serverHandler : connectedServers.values())
-    {
-      serverHandler.shutdown();
-    }
-
-    // Shutdown the dbHandlers
-    synchronized (sourceDbHandlers)
-    {
-      for (DbHandler dbHandler : sourceDbHandlers.values())
+      if (!senderHandler.isChangelogServer())
       {
-        dbHandler.shutdown();
+        // Send to all changelogServers
+        for (ServerHandler destinationHandler : changelogServers.values())
+        {
+          servers.add(destinationHandler);
+        }
       }
-      sourceDbHandlers.clear();
+
+      // Send to all connected LDAP servers
+      for (ServerHandler destinationHandler : connectedServers.values())
+      {
+        // Don't loop on the sender
+        if (destinationHandler == senderHandler)
+          continue;
+        servers.add(destinationHandler);
+      }
     }
+    else
+    {
+      // Destination is one server
+      ServerHandler destinationHandler =
+        connectedServers.get(msg.getDestination());
+      if (destinationHandler != null)
+      {
+        servers.add(destinationHandler);
+      }
+      else
+      {
+        // the targeted server is NOT connected
+        if (senderHandler.isLDAPserver())
+        {
+          // let's forward to the other changelogs
+          servers.addAll(changelogServers.values());
+        }
+      }
+    }
+
+    return servers;
   }
 
   /**
-   * Returns the ServerState describing the last change from this replica.
+   * Process an InitializeRequestMessage.
    *
-   * @return The ServerState describing the last change from this replica.
+   * @param msg The message received and to be processed.
+   * @param senderHandler The server handler of the server that emitted
+   * the message.
    */
-  public ServerState getDbServerState()
+  public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
-    ServerState serverState = new ServerState();
-    for (DbHandler db : sourceDbHandlers.values())
-    {
-      serverState.update(db.getLastChange());
-    }
-    return serverState;
-  }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String toString()
-  {
-    return "ChangelogCache " + baseDn;
-  }
+    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
 
-  /**
-   * Check if some server Handler should be removed from flow control state.
-   * @throws IOException If an error happened.
-   */
-  public void checkAllSaturation() throws IOException
-  {
-    for (ServerHandler handler : changelogServers.values())
+    if (servers.isEmpty())
     {
-      handler.checkWindow();
+      if (!(msg instanceof InitializeRequestMessage))
+      {
+        // TODO A more elaborated policy is probably needed
+      }
+      else
+      {
+        ErrorMessage errMsg = new ErrorMessage(
+            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+            "serverID:" + msg.getDestination());
+
+        try
+        {
+          senderHandler.send(errMsg);
+        }
+        catch(IOException ioe)
+        {
+          // TODO Handle error properly (sender timeout in addition)
+        }
+      }
+      return;
     }
 
-    for (ServerHandler handler : connectedServers.values())
+    for (ServerHandler targetHandler : servers)
     {
-      handler.checkWindow();
+      try
+      {
+        targetHandler.send(msg);
+      }
+      catch(IOException ioe)
+      {
+        // TODO Handle error properly (sender timeout in addition)
+      }
     }
+
   }
 
-  /**
-   * Check if a server that was in flow control can now restart
-   * sending updates.
-   * @param sourceHandler The server that must be checked.
-   * @return true if the server can restart sending changes.
-   *         false if the server can't restart sending changes.
-   */
-  public boolean restartAfterSaturation(ServerHandler sourceHandler)
-  {
-    for (ServerHandler handler : changelogServers.values())
+    /**
+     * Send back an ack to the server that sent the change.
+     *
+     * @param changeNumber The ChangeNumber of the change that must be acked.
+     * @param isLDAPserver This boolean indicates if the server that sent the
+     *                     change was an LDAP server or a Changelog server.
+     */
+    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
     {
-      if (!handler.restartAfterSaturation(sourceHandler))
+      short serverId = changeNumber.getServerId();
+      sendAck(changeNumber, isLDAPserver, serverId);
+    }
+
+    /**
+     *
+     * Send back an ack to a server that sent the change.
+     *
+     * @param changeNumber The ChangeNumber of the change that must be acked.
+     * @param isLDAPserver This boolean indicates if the server that sent the
+     *                     change was an LDAP server or a Changelog server.
+     * @param serverId     The identifier of the server from which we
+     *                     received the change..
+     */
+    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+        short serverId)
+    {
+      ServerHandler handler;
+      if (isLDAPserver)
+        handler = connectedServers.get(serverId);
+      else
+        handler = changelogServers.get(serverId);
+
+      // TODO : check for null handler and log error
+      try
+      {
+        handler.sendAck(changeNumber);
+      } catch (IOException e)
+      {
+        /*
+         * An error happened trying the send back an ack to this server.
+         * Log an error and close the connection to this server.
+         */
+        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+        String message = getMessage(msgID, this.toString())
+        + stackTraceToSingleLineString(e);
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR,
+            message, msgID);
+        handler.shutdown();
+      }
+    }
+
+    /**
+     * Shutdown this ChangelogCache.
+     */
+    public void shutdown()
+    {
+      // Close session with other changelogs
+      for (ServerHandler serverHandler : changelogServers.values())
+      {
+        serverHandler.shutdown();
+      }
+
+      // Close session with other LDAP servers
+      for (ServerHandler serverHandler : connectedServers.values())
+      {
+        serverHandler.shutdown();
+      }
+
+      // Shutdown the dbHandlers
+      synchronized (sourceDbHandlers)
+      {
+        for (DbHandler dbHandler : sourceDbHandlers.values())
+        {
+          dbHandler.shutdown();
+        }
+        sourceDbHandlers.clear();
+      }
+    }
+
+    /**
+     * Returns the ServerState describing the last change from this replica.
+     *
+     * @return The ServerState describing the last change from this replica.
+     */
+    public ServerState getDbServerState()
+    {
+      ServerState serverState = new ServerState();
+      for (DbHandler db : sourceDbHandlers.values())
+      {
+        serverState.update(db.getLastChange());
+      }
+      return serverState;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString()
+    {
+      return "ChangelogCache " + baseDn;
+    }
+
+    /**
+     * Check if some server Handler should be removed from flow control state.
+     * @throws IOException If an error happened.
+     */
+    public void checkAllSaturation() throws IOException
+    {
+      for (ServerHandler handler : changelogServers.values())
+      {
+        handler.checkWindow();
+      }
+
+      for (ServerHandler handler : connectedServers.values())
+      {
+        handler.checkWindow();
+      }
+    }
+
+    /**
+     * Check if a server that was in flow control can now restart
+     * sending updates.
+     * @param sourceHandler The server that must be checked.
+     * @return true if the server can restart sending changes.
+     *         false if the server can't restart sending changes.
+     */
+    public boolean restartAfterSaturation(ServerHandler sourceHandler)
+    {
+      for (ServerHandler handler : changelogServers.values())
+      {
+        if (!handler.restartAfterSaturation(sourceHandler))
           return false;
-    }
+      }
 
-    for (ServerHandler handler : connectedServers.values())
-    {
-      if (!handler.restartAfterSaturation(sourceHandler))
+      for (ServerHandler handler : connectedServers.values())
+      {
+        if (!handler.restartAfterSaturation(sourceHandler))
           return false;
+      }
+      return true;
     }
-    return true;
-  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 3cee58d..a4e1ae8 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@
 package org.opends.server.synchronization.changelog;
 
 import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -44,6 +46,18 @@
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigEntry;
 import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ChangelogStartMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.ServerStartMessage;
+import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.synchronization.protocol.WindowMessage;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
@@ -51,17 +65,6 @@
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.types.InitializationException;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.ProtocolSession;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
-import org.opends.server.synchronization.protocol.HeartbeatThread;
 import org.opends.server.util.TimeThread;
 
 /**
@@ -108,6 +111,7 @@
                                        // flow controled and should
                                        // be stopped from sending messsages.
   private int saturationCount = 0;
+  private short changelogId;
 
   /**
    * The time in milliseconds between heartbeats from the synchronization
@@ -155,6 +159,7 @@
   public void start(DN baseDn, short changelogId, String changelogURL,
                     int windowSize, Changelog changelog)
   {
+    this.changelogId = changelogId;
     rcvWindowSizeHalf = windowSize/2;
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
@@ -1263,4 +1268,40 @@
   {
     return heartbeatInterval;
   }
+
+  /**
+   * Processes a routable message.
+   *
+   * @param msg The message to be processed.
+   */
+  public void process(RoutableMessage msg)
+  {
+    if (debugEnabled())
+      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.SEVERE_ERROR,
+        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
+
+    changelogCache.process(msg, this);
+  }
+
+  /**
+   * Send an InitializeRequestMessage to the server connected through this
+   * handler.
+   *
+   * @param msg The message to be processed
+   * @throws IOException when raised by the underlying session
+   */
+  public void send(RoutableMessage msg) throws IOException
+  {
+    if (debugEnabled())
+      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.SEVERE_ERROR,
+        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
+
+    session.publish(msg);
+  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
index 6f2e451..2dc7a75 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,6 +34,11 @@
 
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.DoneMessage;
+import org.opends.server.synchronization.protocol.EntryMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.InitializeTargetMessage;
 import org.opends.server.synchronization.protocol.ProtocolSession;
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
 import org.opends.server.synchronization.protocol.UpdateMessage;
@@ -116,6 +121,33 @@
           WindowMessage windowMsg = (WindowMessage) msg;
           handler.updateWindow(windowMsg);
         }
+        else if (msg instanceof InitializeRequestMessage)
+        {
+          InitializeRequestMessage initializeMsg =
+            (InitializeRequestMessage) msg;
+          handler.process(initializeMsg);
+        }
+        else if (msg instanceof InitializeTargetMessage)
+        {
+          InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
+          handler.process(initializeMsg);
+        }
+        else if (msg instanceof EntryMessage)
+        {
+          EntryMessage entryMsg = (EntryMessage) msg;
+          handler.process(entryMsg);
+        }
+        else if (msg instanceof DoneMessage)
+        {
+          DoneMessage doneMsg = (DoneMessage) msg;
+          handler.process(doneMsg);
+        }
+        else if (msg instanceof ErrorMessage)
+        {
+          ErrorMessage errorMsg = (ErrorMessage) msg;
+          handler.process(errorMsg);
+        }
+
       }
     } catch (IOException e)
     {
diff --git a/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index bab9527..d2fc088 100644
--- a/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,13 +317,61 @@
     CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
 
   /**
-   * The message id for thedescription of the attribute used to configure
+   * The message id for the description of the attribute used to configure
    * the purge delay of the Changelog Servers.
    */
   public static final int MSGID_PURGE_DELAY_ATTR =
     CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
 
+  /**
+   * The message id for the error raised when export/import
+   * is rejected due to an export/import already in progress.
+   */
+  public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44;
 
+  /**
+   * The message id for the error raised when import
+   * is rejected due to an invalid source of data imported.
+   */
+  public static final int MSGID_INVALID_IMPORT_SOURCE =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45;
+
+  /**
+   * The message id for the error raised when export
+   * is rejected due to an invalid target to export datas.
+   */
+  public static final int MSGID_INVALID_EXPORT_TARGET =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46;
+
+  /**
+   * The message id for the error raised when import/export message
+   * cannot be routed to an up-and-running target in the domain.
+   */
+  public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47;
+
+  /**
+   * The message ID for the message that will be used when no domain
+   * can be found matching the provided domain base DN.
+   */
+  public static final int  MSGID_NO_MATCHING_DOMAIN =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48;
+
+  /**
+   * The message ID for the message that will be used when no domain
+   * can be found matching the provided domain base DN.
+   */
+  public static final int  MSGID_MULTIPLE_MATCHING_DOMAIN
+       = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49;
+
+
+  /**
+   * The message ID for the message that will be used when the domain
+   * belongs to a provider class that does not allow the export.
+   */
+  public static final int  MSGID_INVALID_PROVIDER =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50;
 
   /**
    * Register the messages from this class in the core server.
@@ -449,5 +497,20 @@
         " restored because changelog servers would not be able to refresh" +
         " LDAP servers with older versions of the data. A zero value" +
         " can be used to specify an infinite delay (or never purge).");
+    MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
+        "The current request is rejected due to an import or an export" +
+        " already in progress for the same data.");
+    MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE,
+        "Invalid source for the import.");
+    MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET,
+        "Invalid target for the export.");
+    MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+        "No reachable peer in the domain.");
+    MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN,
+        "No domain matches the base DN provided.");
+    MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN,
+        "Multiple domains match the base DN provided.");
+    MessageHandler.registerMessage(MSGID_INVALID_PROVIDER,
+        "The provider class does not allow the operation requested.");
   }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index 8fb1917..a985e6b 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,6 +27,8 @@
 package org.opends.server.synchronization.plugin;
 
 import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -345,6 +347,10 @@
           {
             if (session != null)
             {
+              logError(ErrorLogCategory.SYNCHRONIZATION,
+                  ErrorLogSeverity.NOTICE,
+                  "Broker : connect closing session" , 1);
+
               session.close();
               session = null;
             }
@@ -498,6 +504,7 @@
       try
       {
         SynchronizationMessage msg = session.receive();
+
         if (msg instanceof WindowMessage)
         {
           WindowMessage windowMsg = (WindowMessage) msg;
@@ -544,6 +551,11 @@
     connected = false;
     try
     {
+      if (debugEnabled())
+      {
+        debugInfo("ChangelogBroker Stop Closing session");
+      }
+
       session.close();
     } catch (IOException e)
     {}
@@ -682,4 +694,12 @@
   {
     return numLostConnections;
   }
+
+  private void log(String message)
+  {
+    int    msgID   = MSGID_UNKNOWN_TYPE;
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+           ErrorLogSeverity.SEVERE_ERROR,
+           message, msgID);
+  }
 }
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java b/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
index b0bc862..670db40 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,12 +27,14 @@
 
 package org.opends.server.synchronization.plugin;
 
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.synchronization.protocol.ProtocolSession;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+
 import java.io.IOException;
 
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+
 /**
  * This class implements a thread to monitor heartbeat messages from the
  * synchronization server.  Each broker runs one of these threads.
@@ -103,6 +105,9 @@
         long lastReceiveTime = session.getLastReceiveTime();
         if (now > lastReceiveTime + 2 * heartbeatInterval)
         {
+          debugInfo("Heartbeat monitor is closing the broker session " +
+          "because it could not detect a heartbeat.");
+
           // Heartbeat is well overdue so the server is assumed to be dead.
           if (debugEnabled())
           {
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java b/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
index e71fd3b..edbb868 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@
    *             Can be null is the request has no associated operation.
    * @return     The Synchronization domain for this DN.
    */
-  private static SynchronizationDomain findDomain(DN dn, Operation op)
+  public static SynchronizationDomain findDomain(DN dn, Operation op)
   {
     /*
      * Don't run the special synchronization code on Operation that are
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
index c0d02fd..1753d0e 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,7 +233,8 @@
     {
       int msgID = MSGID_ERROR_UPDATING_RUV;
       String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
-          op.toString(), op.getErrorMessage(), baseDn.toString());
+          op.toString(), op.getErrorMessage(), baseDn.toString(),
+          Thread.currentThread().getStackTrace());
       logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
           message, msgID);
     }
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 7b2e078..11dea9d 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,47 +26,59 @@
  */
 package org.opends.server.synchronization.plugin;
 
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.util.ServerConstants.
-     TIME_UNIT_MILLISECONDS_ABBR;
-import static org.opends.server.util.ServerConstants.
-     TIME_UNIT_MILLISECONDS_FULL;
-import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
-import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
+import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import static org.opends.server.messages.ConfigMessages.*;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.messages.ToolMessages.*;
 import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.synchronization.plugin.Historical.*;
+import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
 import static org.opends.server.synchronization.protocol.OperationContext.*;
-import static org.opends.server.loggers.Error.*;
-import static org.opends.server.messages.MessageHandler.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.createEntry;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
+import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.LinkedHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
+import org.opends.server.api.Backend;
 import org.opends.server.api.ConfigurableComponent;
 import org.opends.server.api.DirectoryThread;
+import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.backends.jeb.BackendImpl;
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
 import org.opends.server.config.BooleanConfigAttribute;
 import org.opends.server.config.ConfigAttribute;
 import org.opends.server.config.ConfigEntry;
 import org.opends.server.config.ConfigException;
 import org.opends.server.config.DNConfigAttribute;
 import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.StringConfigAttribute;
 import org.opends.server.config.IntegerWithUnitConfigAttribute;
+import org.opends.server.config.StringConfigAttribute;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.LockFileManager;
 import org.opends.server.core.ModifyDNOperation;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.Operation;
 import org.opends.server.messages.MessageHandler;
+import org.opends.server.synchronization.common.LogMessages;
 import org.opends.server.protocols.asn1.ASN1Exception;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -77,19 +89,30 @@
 import org.opends.server.synchronization.protocol.AckMessage;
 import org.opends.server.synchronization.protocol.AddContext;
 import org.opends.server.synchronization.protocol.DeleteContext;
+import org.opends.server.synchronization.protocol.DoneMessage;
+import org.opends.server.synchronization.protocol.EntryMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.InitializeTargetMessage;
 import org.opends.server.synchronization.protocol.ModifyContext;
 import org.opends.server.synchronization.protocol.ModifyDNMsg;
 import org.opends.server.synchronization.protocol.ModifyDnContext;
 import org.opends.server.synchronization.protocol.OperationContext;
+import org.opends.server.synchronization.protocol.RoutableMessage;
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
 import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.tasks.InitializeTargetTask;
+import org.opends.server.tasks.InitializeTask;
+import org.opends.server.tasks.TaskUtils;
 import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DN;
 import org.opends.server.types.DereferencePolicy;
+import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.Modification;
 import org.opends.server.types.RDN;
 import org.opends.server.types.ResultCode;
@@ -137,8 +160,96 @@
    * server.  Zero means heartbeats are off.
    */
   private long heartbeatInterval = 0;
+  short serverId;
 
-  private short serverId;
+  /**
+   * This class contain the context related to an import or export
+   * launched on the domain.
+   */
+  private class IEContext
+  {
+    // The task that initiated the operation.
+    Task initializeTask;
+    // The input stream for the import
+    SynchroLDIFInputStream ldifImportInputStream = null;
+    // The target in the case of an export
+    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
+    // The source in the case of an import
+    short importSource = RoutableMessage.UNKNOWN_SERVER;
+
+    // The total entry count expected to be processed
+    long entryCount = 0;
+    // The count for the entry left to be processed
+    long entryLeftCount = 0;
+
+    // The exception raised when any
+    DirectoryException exception = null;
+
+    /**
+     * Initializes the counters of the task with the provider value.
+     * @param count The value with which to initialize the counters.
+     */
+    public void initTaskCounters(long count)
+    {
+      entryCount = count;
+      entryLeftCount = count;
+
+      if (initializeTask != null)
+      {
+        if (initializeTask instanceof InitializeTask)
+        {
+          ((InitializeTask)initializeTask).setTotal(entryCount);
+          ((InitializeTask)initializeTask).setLeft(entryCount);
+        }
+        else if (initializeTask instanceof InitializeTargetTask)
+        {
+          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
+          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
+        }
+      }
+    }
+
+    /**
+     * Update the counters of the task for each entry processed during
+     * an import or export.
+     */
+    public void updateTaskCounters()
+    {
+      entryLeftCount--;
+
+      if (initializeTask != null)
+      {
+        if (initializeTask instanceof InitializeTask)
+        {
+          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
+        }
+        else if (initializeTask instanceof InitializeTargetTask)
+        {
+          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
+        }
+      }
+    }
+
+    /**
+     * Update the state of the task.
+     */
+    protected TaskState updateTaskCompletionState()
+    {
+      if (exception == null)
+        return TaskState.COMPLETED_SUCCESSFULLY;
+      else
+        return TaskState.STOPPED_BY_ERROR;
+    }
+  }
+
+  // The context related to an import or export being processed
+  // Null when none is being processed.
+  private IEContext ieContext = null;
+
+  // The backend informations necessary to make an import or export.
+  private Backend backend;
+  private ConfigEntry backendConfigEntry;
+  private List<DN> branches = new ArrayList<DN>(0);
 
   private int listenerThreadNumber = 10;
   private boolean receiveStatus = true;
@@ -160,6 +271,7 @@
   private boolean solveConflictFlag = true;
 
   private boolean disabled = false;
+  private boolean stateSavingDisabled = false;
 
   static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -205,8 +317,6 @@
     timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
   }
 
-
-
   /**
    * Creates a new SynchronizationDomain using configuration from configEntry.
    *
@@ -217,6 +327,7 @@
   public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
   {
     super("Synchronization flush");
+
     /*
      * read the centralized changelog server configuration
      * this is a multivalued attribute
@@ -397,6 +508,10 @@
         if (!receiveStatus)
           broker.suspendReceive();
       }
+
+      // Retrieves the related backend and its config entry
+      retrievesBackendInfos(baseDN);
+
     } catch (Exception e)
     {
      /* TODO should mark that changelog service is
@@ -803,9 +918,9 @@
   }
 
   /**
-   * Receive an update message from the changelog.
+   * Receives an update message from the changelog.
    * also responsible for updating the list of pending changes
-   * @return the received message
+   * @return the received message - null if none
    */
   public UpdateMessage receive()
   {
@@ -823,7 +938,7 @@
             // The server is in the shutdown process
             return null;
           }
-
+          log("Broker received message :" + msg);
           if (msg instanceof AckMessage)
           {
             AckMessage ack = (AckMessage) msg;
@@ -834,6 +949,56 @@
             update = (UpdateMessage) msg;
             receiveUpdate(update);
           }
+          else if (msg instanceof InitializeRequestMessage)
+          {
+            // Another server requests us to provide entries
+            // for a total update
+            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
+            try
+            {
+              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+                  null);
+            }
+            catch(DirectoryException de)
+            {
+              // Returns an error message to notify the sender
+              int msgID = de.getErrorMessageID();
+              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
+                  msgID, de.getMessage());
+              broker.publish(errorMsg);
+            }
+          }
+          else if (msg instanceof InitializeTargetMessage)
+          {
+            // Another server is exporting its entries to us
+            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+
+            try
+            {
+              importBackend(initMsg);
+            }
+            catch(DirectoryException de)
+            {
+              // Return an error message to notify the sender
+              int msgID = de.getErrorMessageID();
+              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
+                  msgID, de.getMessage());
+              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
+              broker.publish(errorMsg);
+            }
+          }
+          else if (msg instanceof ErrorMessage)
+          {
+            if (ieContext != null)
+            {
+              // This is an error termination for the 2 following cases :
+              // - either during an export
+              // - or before an import really started
+              //   For example, when we publish a request and the
+              //  changelog did not find any import source.
+              abandonImportExport((ErrorMessage)msg);
+            }
+          }
         } catch (SocketTimeoutException e)
         {
           // just retry
@@ -876,7 +1041,9 @@
   public void receiveAck(AckMessage ack)
   {
     UpdateMessage update;
-    ChangeNumber changeNumber = ack.getChangeNumber();
+    ChangeNumber changeNumber;
+
+    changeNumber = ack.getChangeNumber();
 
     synchronized (pendingChanges)
     {
@@ -1105,7 +1272,7 @@
         synchronized (this)
         {
           this.wait(1000);
-          if (!disabled )
+          if (!disabled && !stateSavingDisabled )
           {
             // save the RUV
             state.save();
@@ -1151,6 +1318,8 @@
       this.notify();
     }
 
+    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
+
     // stop the ChangelogBroker
     broker.stop();
   }
@@ -1857,6 +2026,7 @@
     return broker.getNumLostConnections();
   }
 
+
   /**
    * Check if the domain solve conflicts.
    *
@@ -1933,6 +2103,988 @@
     // Nothing is needed at the moment
   }
 
+  /*
+   * Total Update >>
+   */
+
+  /**
+   * Receives bytes related to an entry in the context of an import to
+   * initialize the domain (called by SynchronizationDomainLDIFInputStream).
+   *
+   * @return The bytes. Null when the Done or Err message has been received
+   */
+  public byte[] receiveEntryBytes()
+  {
+    SynchronizationMessage msg;
+    while (true)
+    {
+      try
+      {
+        msg = broker.receive();
+
+        if (msg == null)
+        {
+          // The server is in the shutdown process
+          return null;
+        }
+        log("receiveEntryBytes: received " + msg);
+        if (msg instanceof EntryMessage)
+        {
+          // FIXME
+          EntryMessage entryMsg = (EntryMessage)msg;
+          byte[] entryBytes = entryMsg.getEntryBytes().clone();
+          ieContext.updateTaskCounters();
+          return entryBytes;
+        }
+        else if (msg instanceof DoneMessage)
+        {
+          // This is the normal termination of the import
+          // No error is stored and the import is ended
+          // by returning null
+          return null;
+        }
+        else if (msg instanceof ErrorMessage)
+        {
+          // This is an error termination during the import
+          // The error is stored and the import is ended
+          // by returning null
+          ErrorMessage errorMsg = (ErrorMessage)msg;
+          ieContext.exception = new DirectoryException(ResultCode.OTHER,
+              errorMsg.getDetails() , errorMsg.getMsgID());
+          return null;
+        }
+        else
+        {
+          // Other messages received during an import are trashed
+        }
+      }
+      catch(Exception e)
+      {
+        ieContext.exception = new DirectoryException(ResultCode.OTHER,
+            "received an unexpected message type" , 1, e);
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Processes an error message received while an import/export is
+   * on going.
+   * @param errorMsg The error message received.
+   */
+  protected void abandonImportExport(ErrorMessage errorMsg)
+  {
+    // FIXME TBD Treat the case where the error happens while entries
+    // are being exported
+
+    if (ieContext != null)
+    {
+      ieContext.exception = new DirectoryException(ResultCode.OTHER,
+          errorMsg.getDetails() , errorMsg.getMsgID());
+
+      if (ieContext.initializeTask instanceof InitializeTask)
+      {
+        // Update the task that initiated the import
+        ((InitializeTask)ieContext.initializeTask).
+        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
+
+        ieContext = null;
+      }
+    }
+  }
+
+  /**
+   * Clears all the entries from the JE backend determined by the
+   * be id passed into the method.
+   *
+   * @param  createBaseEntry  Indicate whether to automatically create the base
+   *                          entry and add it to the backend.
+   * @param beID  The be id to clear.
+   * @param dn   The suffix of the backend to create if the the createBaseEntry
+   *             boolean is true.
+   * @throws Exception  If an unexpected problem occurs.
+   */
+  public static void clearJEBackend(boolean createBaseEntry, String beID,
+      String dn) throws Exception
+  {
+    BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
+    DN[] baseDNs = backend.getBaseDNs();
+
+    // FIXME Should getConfigEntry be part of TaskUtils ?
+    ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
+
+    // FIXME Should setBackendEnabled be part of TaskUtils ?
+    TaskUtils.setBackendEnabled(configEntry, false);
+
+    try
+    {
+      String lockFile = LockFileManager.getBackendLockFileName(backend);
+      StringBuilder failureReason = new StringBuilder();
+
+      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
+      {
+        throw new RuntimeException(failureReason.toString());
+      }
+
+      try
+      {
+        backend.clearBackend(configEntry, baseDNs);
+      }
+      finally
+      {
+        LockFileManager.releaseLock(lockFile, failureReason);
+      }
+    }
+    finally
+    {
+      TaskUtils.setBackendEnabled(configEntry, true);
+    }
+
+    if (createBaseEntry)
+    {
+      DN baseDN = DN.decode(dn);
+      Entry e = createEntry(baseDN);
+      backend = (BackendImpl)DirectoryServer.getBackend(beID);
+      backend.addEntry(e, null);
+    }
+  }
+
+  /**
+   * Log debug message.
+   * @param message The message to log.
+   */
+  private void log(String message)
+  {
+    if (debugEnabled())
+    {
+      debugInfo("DebugInfo" + message);
+      int    msgID   = MSGID_UNKNOWN_TYPE;
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+          ErrorLogSeverity.NOTICE,
+          "SynchronizationDomain/ " + message, msgID);
+    }
+  }
+
+  /**
+   * Export the entries.
+   * @throws DirectoryException when an error occured
+   */
+  protected void exportBackend() throws DirectoryException
+  {
+    // FIXME Temporary workaround - will probably be fixed when implementing
+    // dynamic config
+    retrievesBackendInfos(this.baseDN);
+
+    //  Acquire a shared lock for the backend.
+    try
+    {
+      String lockFile = LockFileManager.getBackendLockFileName(backend);
+      StringBuilder failureReason = new StringBuilder();
+      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
+      {
+        int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
+        String message = getMessage(msgID, backend.getBackendID(),
+            String.valueOf(failureReason));
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+            message, msgID);
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+    }
+    catch (Exception e)
+    {
+      int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
+      String message = getMessage(msgID, backend.getBackendID());
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+          message + " " + stackTraceToSingleLineString(e), msgID);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+
+    SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
+
+    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
+
+    //  Launch the export.
+    try
+    {
+      DN[] baseDNs = {this.baseDN};
+      backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
+    }
+    catch (DirectoryException de)
+    {
+      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
+      String message = getMessage(msgID, de.getErrorMessage());
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
+          msgID);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+    catch (Exception e)
+    {
+      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
+      String message = getMessage(msgID, stackTraceToSingleLineString(e));
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
+          msgID);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+    finally
+    {
+      //  Clean up after the export by closing the export config.
+      exportConfig.close();
+
+      //  Release the shared lock on the backend.
+      try
+      {
+        String lockFile = LockFileManager.getBackendLockFileName(backend);
+        StringBuilder failureReason = new StringBuilder();
+        if (! LockFileManager.releaseLock(lockFile, failureReason))
+        {
+          int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
+          String message = getMessage(msgID, backend.getBackendID(),
+              String.valueOf(failureReason));
+          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
+              message, msgID);
+          throw new DirectoryException(
+              ResultCode.OTHER, message, msgID, null);
+        }
+      }
+      catch (Exception e)
+      {
+        int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
+        String message = getMessage(msgID, backend.getBackendID(),
+            stackTraceToSingleLineString(e));
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
+            message, msgID);
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+    }
+  }
+
+  /**
+   * Retrieves the backend object related to the domain and the backend's
+   * config entry. They will be used for import and export.
+   * TODO This should be in a shared package rather than here.
+   *
+   * @param baseDN The baseDN to retrieve the backend
+   * @throws DirectoryException when an error occired
+   */
+  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
+  {
+    ArrayList<Backend>     backendList = new ArrayList<Backend>();
+    ArrayList<ConfigEntry> entryList   = new ArrayList<ConfigEntry>();
+    ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
+
+    Backend backend = null;
+    ConfigEntry backendConfigEntry = null;
+    List<DN> branches = new ArrayList<DN>(0);
+
+    // Retrieves the backend related to this domain
+    Backend domainBackend = DirectoryServer.getBackend(baseDN);
+    if (domainBackend == null)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+      String message = getMessage(msgID, DN_BACKEND_BASE);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+
+    // Retrieves its config entry and its DNs
+    int code = getBackends(backendList, entryList, dnList);
+    if (code != 0)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+      String message = getMessage(msgID, DN_BACKEND_BASE);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+
+    int numBackends = backendList.size();
+    for (int i=0; i < numBackends; i++)
+    {
+      Backend b = backendList.get(i);
+
+      if (domainBackend.getBackendID() != b.getBackendID())
+      {
+        continue;
+      }
+
+      if (backend == null)
+      {
+        backend = domainBackend;
+        backendConfigEntry = entryList.get(i).duplicate();
+        branches = dnList.get(i);
+      }
+      else
+      {
+        int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
+        String message = getMessage(msgID, domainBackend.getBackendID());
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+            message, msgID);
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+    }
+
+    if (backend == null)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
+      String message = getMessage(msgID, domainBackend.getBackendID());
+      logError(ErrorLogCategory.BACKEND,
+          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+    else if (! backend.supportsLDIFExport())
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_IMPORT;
+      String message = getMessage(msgID, 0); // FIXME
+      logError(ErrorLogCategory.BACKEND,
+          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+
+
+    this.backend = backend;
+    this.backendConfigEntry = backendConfigEntry;
+    this.branches = branches;
+  }
+
+
+  /**
+   * Sends lDIFEntry entry lines to the export target currently set.
+   *
+   * @param lDIFEntry The lines for the LDIF entry.
+   * @throws IOException when an error occured.
+   */
+  public void sendEntryLines(String lDIFEntry) throws IOException
+  {
+    // If an error was raised - like receiving an ErrorMessage
+    // we just let down the export.
+    if (ieContext.exception != null)
+    {
+      IOException ioe = new IOException(ieContext.exception.getMessage());
+      ieContext = null;
+      throw ioe;
+    }
+
+    // new entry then send the current one
+    EntryMessage entryMessage = new EntryMessage(
+        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
+    broker.publish(entryMessage);
+
+    ieContext.updateTaskCounters();
+  }
+
+  /**
+   * Retrieves information about the backends defined in the Directory Server
+   * configuration.
+   *
+   * @param  backendList  A list into which instantiated (but not initialized)
+   *                      backend instances will be placed.
+   * @param  entryList    A list into which the config entries associated with
+   *                      the backends will be placed.
+   * @param  dnList       A list into which the set of base DNs for each backend
+   *                      will be placed.
+   */
+  private static int getBackends(ArrayList<Backend> backendList,
+                                 ArrayList<ConfigEntry> entryList,
+                                 ArrayList<List<DN>> dnList)
+  throws DirectoryException
+  {
+    //  Get the base entry for all backend configuration.
+    DN backendBaseDN = null;
+    try
+    {
+      backendBaseDN = DN.decode(DN_BACKEND_BASE);
+    }
+    catch (DirectoryException de)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+      String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+    catch (Exception e)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+      String message = getMessage(msgID, DN_BACKEND_BASE,
+          stackTraceToSingleLineString(e));
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+
+    ConfigEntry baseEntry = null;
+    try
+    {
+      baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
+    }
+    catch (ConfigException ce)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
+      String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+    catch (Exception e)
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
+      String message = getMessage(msgID, DN_BACKEND_BASE,
+          stackTraceToSingleLineString(e));
+      throw new DirectoryException(
+          ResultCode.OTHER, message, msgID, null);
+    }
+
+
+    //  Iterate through the immediate children, attempting to parse them as
+    //  backends.
+    for (ConfigEntry configEntry : baseEntry.getChildren().values())
+    {
+      // Get the backend ID attribute from the entry.  If there isn't one, then
+      // skip the entry.
+      String backendID = null;
+      try
+      {
+        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
+        StringConfigAttribute idStub =
+          new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
+              true, false, true);
+        StringConfigAttribute idAttr =
+          (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
+        if (idAttr == null)
+        {
+          continue;
+        }
+        else
+        {
+          backendID = idAttr.activeValue();
+        }
+      }
+      catch (ConfigException ce)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
+        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+            ce.getMessage());
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+      catch (Exception e)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
+        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+            stackTraceToSingleLineString(e));
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+
+
+      //    Get the backend class name attribute from the entry.  If there isn't
+      //    one, then just skip the entry.
+      String backendClassName = null;
+      try
+      {
+        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
+        StringConfigAttribute classStub =
+          new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
+              true, false, false);
+        StringConfigAttribute classAttr =
+          (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
+        if (classAttr == null)
+        {
+          continue;
+        }
+        else
+        {
+          backendClassName = classAttr.activeValue();
+        }
+      }
+      catch (ConfigException ce)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
+        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+            ce.getMessage());
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+      catch (Exception e)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
+        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+            stackTraceToSingleLineString(e));
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+
+      Class backendClass = null;
+      try
+      {
+        backendClass = Class.forName(backendClassName);
+      }
+      catch (Exception e)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS;
+        String message = getMessage(msgID, backendClassName,
+            String.valueOf(configEntry.getDN()),
+            stackTraceToSingleLineString(e));
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);
+      }
+
+      Backend backend = null;
+      try
+      {
+        backend = (Backend) backendClass.newInstance();
+        backend.setBackendID(backendID);
+      }
+      catch (Exception e)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS;
+        String message = getMessage(msgID, backendClassName,
+            String.valueOf(configEntry.getDN()),
+            stackTraceToSingleLineString(e));
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);      }
+
+
+      // Get the base DN attribute from the entry.  If there isn't one, then
+      // just skip this entry.
+      List<DN> baseDNs = null;
+      try
+      {
+        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
+        DNConfigAttribute baseDNStub =
+          new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
+              true, true, true);
+        DNConfigAttribute baseDNAttr =
+          (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
+        if (baseDNAttr == null)
+        {
+          msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND;
+          String message = getMessage(msgID,
+              String.valueOf(configEntry.getDN()));
+          throw new DirectoryException(
+              DirectoryServer.getServerErrorResultCode(), message,msgID, null);
+        }
+        else
+        {
+          baseDNs = baseDNAttr.activeValues();
+        }
+      }
+      catch (Exception e)
+      {
+        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND;
+        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+            stackTraceToSingleLineString(e));
+        throw new DirectoryException(
+            ResultCode.OTHER, message, msgID, null);      }
+
+
+      backendList.add(backend);
+      entryList.add(configEntry);
+      dnList.add(baseDNs);
+    }
+    return 0;
+      }
+
+  /**
+   * Initializes this domain from another source server.
+   *
+   * @param source The source from which to initialize
+   * @param initTask The task that launched the initialization
+   *                 and should be updated of its progress.
+   * @throws DirectoryException when an error occurs
+   */
+  public void initialize(short source, Task initTask)
+  throws DirectoryException
+  {
+    acquireIEContext();
+    ieContext.initializeTask = initTask;
+
+    InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
+        baseDN, serverId, source);
+
+    // Publish Init request msg
+    broker.publish(initializeMsg);
+
+    // .. we expect to receive entries or err after that
+  }
+
+  /**
+   * Verifies that the given string represents a valid source
+   * from which this server can be initialized.
+   * @param sourceString The string representaing the source
+   * @return The source as a short value
+   * @throws DirectoryException if the string is not valid
+   */
+  public short decodeSource(String sourceString)
+  throws DirectoryException
+  {
+    short  source = 0;
+    Throwable cause = null;
+    try
+    {
+      source = Integer.decode(sourceString).shortValue();
+      if (source >= -1)
+      {
+        // TODO Verifies serverID is in the domain
+        // We shold check here that this is a server implied
+        // in the current domain.
+
+        log("Source decoded for import:" + source);
+        return source;
+      }
+    }
+    catch(Exception e)
+    {
+      cause = e;
+    }
+
+    ResultCode resultCode = ResultCode.OTHER;
+    int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
+    String message = getMessage(errorMessageID);
+
+    if (cause != null)
+      throw new DirectoryException(
+          resultCode, message, errorMessageID, cause);
+    else
+      throw new DirectoryException(
+          resultCode, message, errorMessageID);
+  }
+
+  /**
+   * Verifies that the given string represents a valid source
+   * from which this server can be initialized.
+   * @param targetString The string representing the source
+   * @return The source as a short value
+   * @throws DirectoryException if the string is not valid
+   */
+  public short decodeTarget(String targetString)
+  throws DirectoryException
+  {
+    short  target = 0;
+    Throwable cause;
+    if (targetString.equalsIgnoreCase("all"))
+    {
+      return RoutableMessage.ALL_SERVERS;
+    }
+
+    // So should be a serverID
+    try
+    {
+      target = Integer.decode(targetString).shortValue();
+      if (target >= 0)
+      {
+        // FIXME Could we check now that it is a know server in the domain ?
+      }
+      return target;
+    }
+    catch(Exception e)
+    {
+      cause = e;
+    }
+    ResultCode resultCode = ResultCode.OTHER;
+    int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
+    String message = getMessage(errorMessageID);
+
+    if (cause != null)
+      throw new DirectoryException(
+          resultCode, message, errorMessageID, cause);
+    else
+      throw new DirectoryException(
+          resultCode, message, errorMessageID);
+
+  }
+
+  private synchronized void acquireIEContext()
+  throws DirectoryException
+  {
+    if (ieContext != null)
+    {
+      // Rejects 2 simultaneous exports
+      int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
+      String message = getMessage(msgID);
+      throw new DirectoryException(ResultCode.OTHER,
+          message, msgID);
+    }
+
+    ieContext = new IEContext();
+  }
+
+  private synchronized void releaseIEContext()
+  {
+    ieContext = null;
+  }
+
+  /**
+   * Process the initialization of some other server or servers in the topology
+   * specified by the target argument.
+   * @param target The target that should be initialized
+   * @param initTask The task that triggers this initialization and that should
+   *                 be updated with its progress.
+   * @exception DirectoryException When an error occurs.
+   */
+  public void initializeTarget(short target, Task initTask)
+  throws DirectoryException
+  {
+    initializeTarget(target, serverId, initTask);
+  }
+
+  /**
+   * Process the initialization of some other server or servers in the topology
+   * specified by the target argument when this initialization has been
+   * initiated by another server than this one.
+   * @param target The target that should be initialized.
+   * @param requestorID The server that initiated the export.
+   * @param initTask The task that triggers this initialization and that should
+   *  be updated with its progress.
+   * @exception DirectoryException When an error occurs.
+   */
+  public void initializeTarget(short target, short requestorID, Task initTask)
+  throws DirectoryException
+  {
+    acquireIEContext();
+
+    ieContext.exportTarget = target;
+    ieContext.initializeTask = initTask;
+    ieContext.initTaskCounters(backend.getEntryCount());
+
+    // Send start message
+    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
+        baseDN, serverId, ieContext.exportTarget, requestorID,
+        ieContext.entryLeftCount);
+
+    log("SD : publishes " + initializeMessage +
+        " for #entries=" + ieContext.entryCount);
+
+    broker.publish(initializeMessage);
+
+    // make an export and send entries
+    exportBackend();
+
+    // Successfull termnation
+    DoneMessage doneMsg = new DoneMessage(serverId,
+      initializeMessage.getDestination());
+    broker.publish(doneMsg);
+
+    if (ieContext != null)
+    {
+      ieContext.updateTaskCompletionState();
+      ieContext = null;
+    }
+  }
+
+  /**
+   * Process backend before import.
+   * @param backend The backend.
+   * @param backendConfigEntry The config entry of the backend.
+   * @throws Exception
+   */
+  private void preBackendImport(Backend backend,
+      ConfigEntry backendConfigEntry)
+  throws Exception
+  {
+    // Stop saving state
+    stateSavingDisabled = true;
+
+    // Clear the backend
+    clearJEBackend(false,backend.getBackendID(),null);
+
+    // FIXME setBackendEnabled should be part of TaskUtils ?
+    TaskUtils.setBackendEnabled(backendConfigEntry, false);
+
+    // Acquire an exclusive lock for the backend.
+    String lockFile = LockFileManager.getBackendLockFileName(backend);
+    StringBuilder failureReason = new StringBuilder();
+    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
+      String message = getMessage(msgID, backend.getBackendID(),
+          String.valueOf(failureReason));
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+          message, msgID);
+      throw new DirectoryException(ResultCode.OTHER, message, msgID);
+    }
+  }
+
+  /**
+   * Initializes the domain's backend with received entries.
+   * @param initializeMessage The message that initiated the import.
+   * @exception DirectoryException Thrown when an error occurs.
+   */
+  protected void importBackend(InitializeTargetMessage initializeMessage)
+  throws DirectoryException
+  {
+    LDIFImportConfig importConfig = null;
+    try
+    {
+      log("startImport");
+
+      if (initializeMessage.getRequestorID() == serverId)
+      {
+        // The import responds to a request we did so the IEContext
+        // is already acquired
+      }
+      else
+      {
+        acquireIEContext();
+      }
+
+      ieContext.importSource = initializeMessage.getsenderID();
+      ieContext.entryLeftCount = initializeMessage.getEntryCount();
+      ieContext.initTaskCounters(initializeMessage.getEntryCount());
+
+      preBackendImport(this.backend, this.backendConfigEntry);
+
+      DN[] baseDNs = {baseDN};
+      ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
+      importConfig =
+        new LDIFImportConfig(ieContext.ldifImportInputStream);
+      importConfig.setIncludeBranches(this.branches);
+
+      // TODO How to deal with rejected entries during the import
+      // importConfig.writeRejectedEntries("rejectedImport",
+      // ExistingFileBehavior.OVERWRITE);
+
+      // Process import
+      this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
+
+      stateSavingDisabled = false;
+
+      // Re-exchange state with SS
+      broker.stop();
+      broker.start(changelogServers);
+
+    }
+    catch(Exception e)
+    {
+      throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
+          2);// FIXME
+    }
+    finally
+    {
+      // Cleanup
+      importConfig.close();
+
+      // Re-enable backend
+      closeBackendImport(this.backend, this.backendConfigEntry);
+
+      // Update the task that initiated the import
+      if ((ieContext != null ) && (ieContext.initializeTask != null))
+      {
+        ((InitializeTask)ieContext.initializeTask).
+        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
+      }
+
+      releaseIEContext();
+
+      log("End importBackend");
+    }
+    // Success
+  }
+
+  /**
+   * Make post import operations.
+   * @param backend The backend implied in the import.
+   * @param backendConfigEntry The config entry of the backend.
+   * @exception DirectoryException Thrown when an error occurs.
+   */
+  protected void closeBackendImport(Backend backend,
+      ConfigEntry backendConfigEntry)
+  throws DirectoryException
+  {
+    String lockFile = LockFileManager.getBackendLockFileName(backend);
+    StringBuilder failureReason = new StringBuilder();
+
+    // Release lock
+    if (!LockFileManager.releaseLock(lockFile, failureReason))
+    {
+      int    msgID   = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
+      String message = getMessage(msgID, backend.getBackendID(),
+          String.valueOf(failureReason));
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+          message, msgID);
+      new DirectoryException(ResultCode.OTHER, message, msgID);
+    }
+
+    // FIXME setBackendEnabled should be part taskUtils ?
+    TaskUtils.setBackendEnabled(backendConfigEntry, true);
+  }
+
+  /**
+   * Retrieves a synchronization domain based on the baseDN.
+   *
+   * @param baseDN The baseDN of the domain to retrieve
+   * @return The domain retrieved
+   * @throws DirectoryException When an error occured.
+   */
+  public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
+  throws DirectoryException
+  {
+    SynchronizationDomain synchronizationDomain = null;
+
+    // Retrieves the domain
+    DirectoryServer.getSynchronizationProviders();
+    for (SynchronizationProvider provider :
+      DirectoryServer.getSynchronizationProviders())
+    {
+      if (!( provider instanceof MultimasterSynchronization))
+      {
+        int msgID = LogMessages.MSGID_INVALID_PROVIDER;
+        String message = getMessage(msgID);
+        throw new DirectoryException(ResultCode.OTHER,
+            message, msgID);
+      }
+
+      // From the domainDN retrieves the synchronization domain
+      SynchronizationDomain sdomain =
+        MultimasterSynchronization.findDomain(baseDN, null);
+      if (sdomain == null)
+      {
+        int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
+        String message = getMessage(msgID) + " " + baseDN;
+        throw new DirectoryException(ResultCode.OTHER,
+            message, msgID);
+      }
+
+      if (synchronizationDomain != null)
+      {
+        // Should never happen
+        int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
+        String message = getMessage(msgID);
+        throw new DirectoryException(ResultCode.OTHER,
+            message, msgID);
+      }
+      synchronizationDomain = sdomain;
+    }
+    return synchronizationDomain;
+  }
+
+  /**
+   * Returns the backend associated to this domain.
+   * @return The associated backend.
+   */
+  public Backend getBackend()
+  {
+    return backend;
+  }
+
+  /**
+   * Returns a boolean indiciating if an import or export is currently
+   * processed.
+   * @return The status
+   */
+  public boolean ieRunning()
+  {
+    return (ieContext != null);
+  }
+  /*
+   * <<Total Update
+   */
+
+
   /**
    * Push the modifications contain the in given parameter has
    * a modification that would happen on a local server.
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java b/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
index b57cc5d..d1ceb76 100644
--- a/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,11 +112,16 @@
     /* Read the first 8 bytes containing the packet length */
     int length = 0;
 
+    /* Let's start the stop-watch before waiting on read */
+    /* for the heartbeat check to be operationnal        */
+    lastReceiveTime = System.currentTimeMillis();
+
     while (length<8)
     {
       int read = input.read(rcvLengthBuf, length, 8-length);
       if (read == -1)
       {
+        lastReceiveTime=0;
         throw new IOException("no more data");
       }
       else
@@ -135,8 +140,9 @@
       {
         length += input.read(buffer, length, totalLength - length);
       }
-
-      lastReceiveTime = System.currentTimeMillis();
+      /* We do not want the heartbeat to close the session when */
+      /* we are processing a message even a time consuming one. */
+      lastReceiveTime=0;
       return SynchronizationMessage.generateMsg(buffer);
     }
     catch (OutOfMemoryError e)
@@ -159,6 +165,10 @@
    */
   public long getLastReceiveTime()
   {
+    if (lastReceiveTime==0)
+    {
+      return System.currentTimeMillis();
+    }
     return lastReceiveTime;
   }
 
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
index aab19be..59777e5 100644
--- a/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,6 +47,13 @@
   static final byte MSG_TYPE_CHANGELOG_START = 7;
   static final byte MSG_TYPE_WINDOW = 8;
   static final byte MSG_TYPE_HEARTBEAT = 9;
+  static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
+  static final byte MSG_TYPE_INITIALIZE_TARGET = 11;
+  static final byte MSG_TYPE_ENTRY = 12;
+  static final byte MSG_TYPE_DONE = 13;
+  static final byte MSG_TYPE_ERROR = 14;
+  // Adding a new type of message here probably requires to
+  // change accordingly generateMsg method below
 
   /**
    * Return the byte[] representation of this message.
@@ -60,6 +67,11 @@
    * MSG_TYPE_CHANGELOG_START
    * MSG_TYPE_WINDOW
    * MSG_TYPE_HEARTBEAT
+   * MSG_TYPE_INITIALIZE
+   * MSG_TYPE_INITIALIZE_TARGET
+   * MSG_TYPE_ENTRY
+   * MSG_TYPE_DONE
+   * MSG_TYPE_ERROR
    *
    * @return the byte[] representation of this message.
    */
@@ -107,6 +119,21 @@
       case MSG_TYPE_HEARTBEAT:
         msg = new HeartbeatMessage(buffer);
       break;
+      case MSG_TYPE_INITIALIZE_REQUEST:
+        msg = new InitializeRequestMessage(buffer);
+      break;
+      case MSG_TYPE_INITIALIZE_TARGET:
+        msg = new InitializeTargetMessage(buffer);
+      break;
+      case MSG_TYPE_ENTRY:
+        msg = new EntryMessage(buffer);
+      break;
+      case MSG_TYPE_DONE:
+        msg = new DoneMessage(buffer);
+      break;
+      case MSG_TYPE_ERROR:
+        msg = new ErrorMessage(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index f89e5a2..f45713f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,6 +26,7 @@
  */
 package org.opends.server.synchronization;
 
+import static org.opends.server.loggers.Error.logError;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -54,6 +55,8 @@
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
 import org.opends.server.types.ByteStringFactory;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.types.SearchScope;
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.AttributeType;
@@ -158,7 +161,11 @@
         }
       }
       catch (Exception e)
-      { }
+      { 
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.NOTICE,
+            "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1);
+      }
     }
     return broker;
   }
@@ -221,7 +228,8 @@
         }
       }
       catch (Exception e)
-      { }
+      { 
+      }
     }
     return broker;
   }
@@ -231,6 +239,10 @@
    */
   protected void cleanEntries()
   {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "SynchronizationTestCase/Cleaning entries" , 1);
+
     DeleteOperation op;
     // Delete entries
     try
@@ -238,6 +250,10 @@
       while (true)
       {
         DN dn = entryList.removeLast();
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.NOTICE,
+            "cleaning entry " + dn, 1);
+
         op = new DeleteOperation(connection, InternalClientConnection
             .nextOperationID(), InternalClientConnection.nextMessageID(), null,
             dn);
@@ -264,8 +280,13 @@
     // WORKAROUND FOR BUG #639 - BEGIN -
     if (mms != null)
     {
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+          ErrorLogSeverity.NOTICE,
+          "SynchronizationTestCase/FinalizeSynchronization Provider" , 1);
+
       DirectoryServer.deregisterSynchronizationProvider(mms);
       mms.finalizeSynchronizationProvider();
+      mms = null;
     }
     // WORKAROUND FOR BUG #639 - END -
 
@@ -303,17 +324,22 @@
 
     //
     // Add the changelog server
-    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
-    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
+    if (changeLogEntry!=null)
+    {
+      DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
+      assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
         "Unable to add the changeLog server");
-    entryList.add(changeLogEntry.getDN());
-
-    //
-    // We also have a replicated suffix (synchronization domain)
-    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
-    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
-        "Unable to add the synchronized server");
-    entryList.add(synchroServerEntry.getDN());
+      entryList.add(changeLogEntry.getDN());
+    }
+    
+    if (synchroServerEntry!=null)
+    {
+      // We also have a replicated suffix (synchronization domain)
+      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+      "Unable to add the synchronized suffix");
+      entryList.add(synchroServerEntry.getDN());
+    }
   }
 
   /**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 0b1bc2e..dcf23e0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@
           + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
           + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
           + "ds-cfg-window-size: 100" + "\n"
-          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
+          + "ds-cfg-changelog-db-directory: changelogDb"+i;
         Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
         ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
         changelogs[i] = new Changelog(changelogConfig);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
index 20e8a20..0dccba8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,16 +26,18 @@
  */
 package org.opends.server.synchronization.protocol;
 
+import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.UUID;
 import java.util.zip.DataFormatException;
 
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import static org.testng.Assert.*;
-
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
@@ -56,8 +58,8 @@
 import org.opends.server.types.ObjectClass;
 import org.opends.server.types.RDN;
 import org.opends.server.util.TimeThread;
-
-import static org.opends.server.synchronization.protocol.OperationContext.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
 
 /**
  * Test the contructors, encoders and decoders of the synchronization
@@ -520,6 +522,104 @@
   }
 
   /**
+   * Test that EntryMessage encoding and decoding works
+   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
+   */
+  @Test()
+  public void EntryMessageTest() throws Exception
+  {
+    String taskInitFromS2 = new String(
+        "dn: ds-task-id=" + UUID.randomUUID() +
+        ",cn=Scheduled Tasks,cn=Tasks\n" +
+        "objectclass: top\n" +
+        "objectclass: ds-task\n" +
+        "objectclass: ds-task-initialize\n" +
+        "ds-task-class-name: org.opends.server.tasks.InitializeTask" +
+        "ds-task-initialize-domain-dn: dc=example,dc=com" +
+        "ds-task-initialize-source: 1");
+    short sender = 1;
+    short target = 2;
+    byte[] entry = taskInitFromS2.getBytes();
+    EntryMessage msg = new EntryMessage(sender, target, entry);
+    EntryMessage newMsg = new EntryMessage(msg.getBytes());
+    assertEquals(msg.getsenderID(), newMsg.getsenderID());
+    assertEquals(msg.getDestination(), newMsg.getDestination());
+    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
+  }
+
+  /**
+   * Test that InitializeRequestMessage encoding and decoding works
+   */
+  @Test()
+  public void InitializeRequestMessageTest() throws Exception
+  {
+    short sender = 1;
+    short target = 2;
+    InitializeRequestMessage msg = new InitializeRequestMessage(
+        DN.decode("dc=example"), sender, target);
+    InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes());
+    assertEquals(msg.getsenderID(), newMsg.getsenderID());
+    assertEquals(msg.getDestination(), newMsg.getDestination());
+    assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
+  }
+
+  /**
+   * Test that InitializeTargetMessage encoding and decoding works
+   */
+  @Test()
+  public void InitializeTargetMessageTest() throws Exception
+  {
+    short senderID = 1;
+    short targetID = 2;
+    short requestorID = 3;
+    long entryCount = 4;
+    DN baseDN = DN.decode("dc=example");
+      
+    InitializeTargetMessage msg = new InitializeTargetMessage(
+        baseDN, senderID, targetID, requestorID, entryCount);
+    InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes());
+    assertEquals(msg.getsenderID(), newMsg.getsenderID());
+    assertEquals(msg.getDestination(), newMsg.getDestination());
+    assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
+    assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
+    assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
+    
+    assertEquals(senderID, newMsg.getsenderID());
+    assertEquals(targetID, newMsg.getDestination());
+    assertEquals(requestorID, newMsg.getRequestorID());
+    assertEquals(entryCount, newMsg.getEntryCount());
+    assertTrue(baseDN.equals(newMsg.getBaseDN())) ;
+
+  }
+
+  /**
+   * Test that DoneMessage encoding and decoding works
+   */
+  @Test()
+  public void DoneMessage() throws Exception
+  {
+    DoneMessage msg = new DoneMessage((short)1, (short)2);
+    DoneMessage newMsg = new DoneMessage(msg.getBytes());
+    assertEquals(msg.getsenderID(), newMsg.getsenderID());
+    assertEquals(msg.getDestination(), newMsg.getDestination());
+  }
+
+  /**
+   * Test that ErrorMessage encoding and decoding works
+   */
+  @Test()
+  public void ErrorMessage() throws Exception
+  {
+    ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
+    ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
+    assertEquals(msg.getsenderID(), newMsg.getsenderID());
+    assertEquals(msg.getDestination(), newMsg.getDestination());
+    assertEquals(msg.getMsgID(), newMsg.getMsgID());
+    assertEquals(msg.getDetails(), newMsg.getDetails());
+  }
+
+
+  /**
    * Test PendingChange
    */
   private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)

--
Gitblit v1.10.0