mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
29.53.2007 40cef7d36084fbe86d34cfa497628d8972c4c9e7
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,47 +26,59 @@
 */
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_ABBR;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_FULL;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.ConfigMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -77,19 +89,30 @@
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
import org.opends.server.synchronization.protocol.DoneMessage;
import org.opends.server.synchronization.protocol.EntryMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -137,8 +160,96 @@
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  short serverId;
  private short serverId;
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   */
  private class IEContext
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    SynchroLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMessage.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry left to be processed
    long entryLeftCount = 0;
    // The exception raised when any
    DirectoryException exception = null;
    /**
     * Initializes the counters of the task with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initTaskCounters(long count)
    {
      entryCount = count;
      entryLeftCount = count;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
      }
    }
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     */
    public void updateTaskCounters()
    {
      entryLeftCount--;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
        }
      }
    }
    /**
     * Update the state of the task.
     */
    protected TaskState updateTaskCompletionState()
    {
      if (exception == null)
        return TaskState.COMPLETED_SUCCESSFULLY;
      else
        return TaskState.STOPPED_BY_ERROR;
    }
  }
  // The context related to an import or export being processed
  // Null when none is being processed.
  private IEContext ieContext = null;
  // The backend informations necessary to make an import or export.
  private Backend backend;
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
@@ -160,6 +271,7 @@
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -205,8 +317,6 @@
    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
  }
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   *
@@ -217,6 +327,7 @@
  public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
  {
    super("Synchronization flush");
    /*
     * read the centralized changelog server configuration
     * this is a multivalued attribute
@@ -397,6 +508,10 @@
        if (!receiveStatus)
          broker.suspendReceive();
      }
      // Retrieves the related backend and its config entry
      retrievesBackendInfos(baseDN);
    } catch (Exception e)
    {
     /* TODO should mark that changelog service is
@@ -803,9 +918,9 @@
  }
  /**
   * Receive an update message from the changelog.
   * Receives an update message from the changelog.
   * also responsible for updating the list of pending changes
   * @return the received message
   * @return the received message - null if none
   */
  public UpdateMessage receive()
  {
@@ -823,7 +938,7 @@
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
@@ -834,6 +949,56 @@
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
            }
            catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
            }
            catch(DirectoryException de)
            {
              // Return an error message to notify the sender
              int msgID = de.getErrorMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
            }
          }
          else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  changelog did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
@@ -876,7 +1041,9 @@
  public void receiveAck(AckMessage ack)
  {
    UpdateMessage update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    ChangeNumber changeNumber;
    changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    {
@@ -1105,7 +1272,7 @@
        synchronized (this)
        {
          this.wait(1000);
          if (!disabled )
          if (!disabled && !stateSavingDisabled )
          {
            // save the RUV
            state.save();
@@ -1151,6 +1318,8 @@
      this.notify();
    }
    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
    // stop the ChangelogBroker
    broker.stop();
  }
@@ -1857,6 +2026,7 @@
    return broker.getNumLostConnections();
  }
  /**
   * Check if the domain solve conflicts.
   *
@@ -1933,6 +2103,988 @@
    // Nothing is needed at the moment
  }
  /*
   * Total Update >>
   */
  /**
   * Receives bytes related to an entry in the context of an import to
   * initialize the domain (called by SynchronizationDomainLDIFInputStream).
   *
   * @return The bytes. Null when the Done or Err message has been received
   */
  public byte[] receiveEntryBytes()
  {
    SynchronizationMessage msg;
    while (true)
    {
      try
      {
        msg = broker.receive();
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        log("receiveEntryBytes: received " + msg);
        if (msg instanceof EntryMessage)
        {
          // FIXME
          EntryMessage entryMsg = (EntryMessage)msg;
          byte[] entryBytes = entryMsg.getEntryBytes().clone();
          ieContext.updateTaskCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          return null;
        }
        else if (msg instanceof ErrorMessage)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ieContext.exception = new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails() , errorMsg.getMsgID());
          return null;
        }
        else
        {
          // Other messages received during an import are trashed
        }
      }
      catch(Exception e)
      {
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            "received an unexpected message type" , 1, e);
      }
      return null;
    }
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * @param errorMsg The error message received.
   */
  protected void abandonImportExport(ErrorMessage errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
          errorMsg.getDetails() , errorMsg.getMsgID());
      if (ieContext.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
        ieContext = null;
      }
    }
  }
  /**
   * Clears all the entries from the JE backend determined by the
   * be id passed into the method.
   *
   * @param  createBaseEntry  Indicate whether to automatically create the base
   *                          entry and add it to the backend.
   * @param beID  The be id to clear.
   * @param dn   The suffix of the backend to create if the the createBaseEntry
   *             boolean is true.
   * @throws Exception  If an unexpected problem occurs.
   */
  public static void clearJEBackend(boolean createBaseEntry, String beID,
      String dn) throws Exception
  {
    BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
    DN[] baseDNs = backend.getBaseDNs();
    // FIXME Should getConfigEntry be part of TaskUtils ?
    ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
    // FIXME Should setBackendEnabled be part of TaskUtils ?
    TaskUtils.setBackendEnabled(configEntry, false);
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
      {
        throw new RuntimeException(failureReason.toString());
      }
      try
      {
        backend.clearBackend(configEntry, baseDNs);
      }
      finally
      {
        LockFileManager.releaseLock(lockFile, failureReason);
      }
    }
    finally
    {
      TaskUtils.setBackendEnabled(configEntry, true);
    }
    if (createBaseEntry)
    {
      DN baseDN = DN.decode(dn);
      Entry e = createEntry(baseDN);
      backend = (BackendImpl)DirectoryServer.getBackend(beID);
      backend.addEntry(e, null);
    }
  }
  /**
   * Log debug message.
   * @param message The message to log.
   */
  private void log(String message)
  {
    if (debugEnabled())
    {
      debugInfo("DebugInfo" + message);
      int    msgID   = MSGID_UNKNOWN_TYPE;
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationDomain/ " + message, msgID);
    }
  }
  /**
   * Export the entries.
   * @throws DirectoryException when an error occured
   */
  protected void exportBackend() throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            String.valueOf(failureReason));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message + " " + stackTraceToSingleLineString(e), msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    //  Launch the export.
    try
    {
      DN[] baseDNs = {this.baseDN};
      backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, de.getErrorMessage());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
      String message = getMessage(msgID, stackTraceToSingleLineString(e));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
          msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    finally
    {
      //  Clean up after the export by closing the export config.
      exportConfig.close();
      //  Release the shared lock on the backend.
      try
      {
        String lockFile = LockFileManager.getBackendLockFileName(backend);
        StringBuilder failureReason = new StringBuilder();
        if (! LockFileManager.releaseLock(lockFile, failureReason))
        {
          int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
          String message = getMessage(msgID, backend.getBackendID(),
              String.valueOf(failureReason));
          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
              message, msgID);
          throw new DirectoryException(
              ResultCode.OTHER, message, msgID, null);
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
        String message = getMessage(msgID, backend.getBackendID(),
            stackTraceToSingleLineString(e));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
  }
  /**
   * Retrieves the backend object related to the domain and the backend's
   * config entry. They will be used for import and export.
   * TODO This should be in a shared package rather than here.
   *
   * @param baseDN The baseDN to retrieve the backend
   * @throws DirectoryException when an error occired
   */
  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
  {
    ArrayList<Backend>     backendList = new ArrayList<Backend>();
    ArrayList<ConfigEntry> entryList   = new ArrayList<ConfigEntry>();
    ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
    Backend backend = null;
    ConfigEntry backendConfigEntry = null;
    List<DN> branches = new ArrayList<DN>(0);
    // Retrieves the backend related to this domain
    Backend domainBackend = DirectoryServer.getBackend(baseDN);
    if (domainBackend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    // Retrieves its config entry and its DNs
    int code = getBackends(backendList, entryList, dnList);
    if (code != 0)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    int numBackends = backendList.size();
    for (int i=0; i < numBackends; i++)
    {
      Backend b = backendList.get(i);
      if (domainBackend.getBackendID() != b.getBackendID())
      {
        continue;
      }
      if (backend == null)
      {
        backend = domainBackend;
        backendConfigEntry = entryList.get(i).duplicate();
        branches = dnList.get(i);
      }
      else
      {
        int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
        String message = getMessage(msgID, domainBackend.getBackendID());
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
    }
    if (backend == null)
    {
      int    msgID   = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
      String message = getMessage(msgID, domainBackend.getBackendID());
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    else if (! backend.supportsLDIFExport())
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_IMPORT;
      String message = getMessage(msgID, 0); // FIXME
      logError(ErrorLogCategory.BACKEND,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    this.backend = backend;
    this.backendConfigEntry = backendConfigEntry;
    this.branches = branches;
  }
  /**
   * Sends lDIFEntry entry lines to the export target currently set.
   *
   * @param lDIFEntry The lines for the LDIF entry.
   * @throws IOException when an error occured.
   */
  public void sendEntryLines(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // we just let down the export.
    if (ieContext.exception != null)
    {
      IOException ioe = new IOException(ieContext.exception.getMessage());
      ieContext = null;
      throw ioe;
    }
    // new entry then send the current one
    EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    ieContext.updateTaskCounters();
  }
  /**
   * Retrieves information about the backends defined in the Directory Server
   * configuration.
   *
   * @param  backendList  A list into which instantiated (but not initialized)
   *                      backend instances will be placed.
   * @param  entryList    A list into which the config entries associated with
   *                      the backends will be placed.
   * @param  dnList       A list into which the set of base DNs for each backend
   *                      will be placed.
   */
  private static int getBackends(ArrayList<Backend> backendList,
                                 ArrayList<ConfigEntry> entryList,
                                 ArrayList<List<DN>> dnList)
  throws DirectoryException
  {
    //  Get the base entry for all backend configuration.
    DN backendBaseDN = null;
    try
    {
      backendBaseDN = DN.decode(DN_BACKEND_BASE);
    }
    catch (DirectoryException de)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    ConfigEntry baseEntry = null;
    try
    {
      baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
    }
    catch (ConfigException ce)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    catch (Exception e)
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
      String message = getMessage(msgID, DN_BACKEND_BASE,
          stackTraceToSingleLineString(e));
      throw new DirectoryException(
          ResultCode.OTHER, message, msgID, null);
    }
    //  Iterate through the immediate children, attempting to parse them as
    //  backends.
    for (ConfigEntry configEntry : baseEntry.getChildren().values())
    {
      // Get the backend ID attribute from the entry.  If there isn't one, then
      // skip the entry.
      String backendID = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
        StringConfigAttribute idStub =
          new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
              true, false, true);
        StringConfigAttribute idAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
        if (idAttr == null)
        {
          continue;
        }
        else
        {
          backendID = idAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      //    Get the backend class name attribute from the entry.  If there isn't
      //    one, then just skip the entry.
      String backendClassName = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
        StringConfigAttribute classStub =
          new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
              true, false, false);
        StringConfigAttribute classAttr =
          (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
        if (classAttr == null)
        {
          continue;
        }
        else
        {
          backendClassName = classAttr.activeValue();
        }
      }
      catch (ConfigException ce)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            ce.getMessage());
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Class backendClass = null;
      try
      {
        backendClass = Class.forName(backendClassName);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);
      }
      Backend backend = null;
      try
      {
        backend = (Backend) backendClass.newInstance();
        backend.setBackendID(backendID);
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS;
        String message = getMessage(msgID, backendClassName,
            String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      // Get the base DN attribute from the entry.  If there isn't one, then
      // just skip this entry.
      List<DN> baseDNs = null;
      try
      {
        int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
        DNConfigAttribute baseDNStub =
          new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
              true, true, true);
        DNConfigAttribute baseDNAttr =
          (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
        if (baseDNAttr == null)
        {
          msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND;
          String message = getMessage(msgID,
              String.valueOf(configEntry.getDN()));
          throw new DirectoryException(
              DirectoryServer.getServerErrorResultCode(), message,msgID, null);
        }
        else
        {
          baseDNs = baseDNAttr.activeValues();
        }
      }
      catch (Exception e)
      {
        int    msgID   = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND;
        String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
            stackTraceToSingleLineString(e));
        throw new DirectoryException(
            ResultCode.OTHER, message, msgID, null);      }
      backendList.add(backend);
      entryList.add(configEntry);
      dnList.add(baseDNs);
    }
    return 0;
      }
  /**
   * Initializes this domain from another source server.
   *
   * @param source The source from which to initialize
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initialize(short source, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.initializeTask = initTask;
    InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
        baseDN, serverId, source);
    // Publish Init request msg
    broker.publish(initializeMsg);
    // .. we expect to receive entries or err after that
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representaing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    short  source = 0;
    Throwable cause = null;
    try
    {
      source = Integer.decode(sourceString).shortValue();
      if (source >= -1)
      {
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        log("Source decoded for import:" + source);
        return source;
      }
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param targetString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeTarget(String targetString)
  throws DirectoryException
  {
    short  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMessage.ALL_SERVERS;
    }
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString).shortValue();
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
      }
      return target;
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
    String message = getMessage(errorMessageID);
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, errorMessageID, cause);
    else
      throw new DirectoryException(
          resultCode, message, errorMessageID);
  }
  private synchronized void acquireIEContext()
  throws DirectoryException
  {
    if (ieContext != null)
    {
      // Rejects 2 simultaneous exports
      int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
      String message = getMessage(msgID);
      throw new DirectoryException(ResultCode.OTHER,
          message, msgID);
    }
    ieContext = new IEContext();
  }
  private synchronized void releaseIEContext()
  {
    ieContext = null;
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument.
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, Task initTask)
  throws DirectoryException
  {
    initializeTarget(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization has been
   * initiated by another server than this one.
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    acquireIEContext();
    ieContext.exportTarget = target;
    ieContext.initializeTask = initTask;
    ieContext.initTaskCounters(backend.getEntryCount());
    // Send start message
    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
        baseDN, serverId, ieContext.exportTarget, requestorID,
        ieContext.entryLeftCount);
    log("SD : publishes " + initializeMessage +
        " for #entries=" + ieContext.entryCount);
    broker.publish(initializeMessage);
    // make an export and send entries
    exportBackend();
    // Successfull termnation
    DoneMessage doneMsg = new DoneMessage(serverId,
      initializeMessage.getDestination());
    broker.publish(doneMsg);
    if (ieContext != null)
    {
      ieContext.updateTaskCompletionState();
      ieContext = null;
    }
  }
  /**
   * Process backend before import.
   * @param backend The backend.
   * @param backendConfigEntry The config entry of the backend.
   * @throws Exception
   */
  private void preBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws Exception
  {
    // Stop saving state
    stateSavingDisabled = true;
    // Clear the backend
    clearJEBackend(false,backend.getBackendID(),null);
    // FIXME setBackendEnabled should be part of TaskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, false);
    // Acquire an exclusive lock for the backend.
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      throw new DirectoryException(ResultCode.OTHER, message, msgID);
    }
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void importBackend(InitializeTargetMessage initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    try
    {
      log("startImport");
      if (initializeMessage.getRequestorID() == serverId)
      {
        // The import responds to a request we did so the IEContext
        // is already acquired
      }
      else
      {
        acquireIEContext();
      }
      ieContext.importSource = initializeMessage.getsenderID();
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initTaskCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend, this.backendConfigEntry);
      DN[] baseDNs = {baseDN};
      ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
      importConfig =
        new LDIFImportConfig(ieContext.ldifImportInputStream);
      importConfig.setIncludeBranches(this.branches);
      // TODO How to deal with rejected entries during the import
      // importConfig.writeRejectedEntries("rejectedImport",
      // ExistingFileBehavior.OVERWRITE);
      // Process import
      this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
      stateSavingDisabled = false;
      // Re-exchange state with SS
      broker.stop();
      broker.start(changelogServers);
    }
    catch(Exception e)
    {
      throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
          2);// FIXME
    }
    finally
    {
      // Cleanup
      importConfig.close();
      // Re-enable backend
      closeBackendImport(this.backend, this.backendConfigEntry);
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      {
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
      }
      releaseIEContext();
      log("End importBackend");
    }
    // Success
  }
  /**
   * Make post import operations.
   * @param backend The backend implied in the import.
   * @param backendConfigEntry The config entry of the backend.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void closeBackendImport(Backend backend,
      ConfigEntry backendConfigEntry)
  throws DirectoryException
  {
    String lockFile = LockFileManager.getBackendLockFileName(backend);
    StringBuilder failureReason = new StringBuilder();
    // Release lock
    if (!LockFileManager.releaseLock(lockFile, failureReason))
    {
      int    msgID   = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
      String message = getMessage(msgID, backend.getBackendID(),
          String.valueOf(failureReason));
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
      new DirectoryException(ResultCode.OTHER, message, msgID);
    }
    // FIXME setBackendEnabled should be part taskUtils ?
    TaskUtils.setBackendEnabled(backendConfigEntry, true);
  }
  /**
   * Retrieves a synchronization domain based on the baseDN.
   *
   * @param baseDN The baseDN of the domain to retrieve
   * @return The domain retrieved
   * @throws DirectoryException When an error occured.
   */
  public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
  throws DirectoryException
  {
    SynchronizationDomain synchronizationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterSynchronization))
      {
        int msgID = LogMessages.MSGID_INVALID_PROVIDER;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      // From the domainDN retrieves the synchronization domain
      SynchronizationDomain sdomain =
        MultimasterSynchronization.findDomain(baseDN, null);
      if (sdomain == null)
      {
        int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
        String message = getMessage(msgID) + " " + baseDN;
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      if (synchronizationDomain != null)
      {
        // Should never happen
        int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
        String message = getMessage(msgID);
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      synchronizationDomain = sdomain;
    }
    return synchronizationDomain;
  }
  /**
   * Returns the backend associated to this domain.
   * @return The associated backend.
   */
  public Backend getBackend()
  {
    return backend;
  }
  /**
   * Returns a boolean indiciating if an import or export is currently
   * processed.
   * @return The status
   */
  public boolean ieRunning()
  {
    return (ieContext != null);
  }
  /*
   * <<Total Update
   */
  /**
   * Push the modifications contain the in given parameter has
   * a modification that would happen on a local server.