mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -27,8 +27,6 @@
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -38,12 +36,14 @@
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.protocols.asn1.ASN1OctetString;
import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -52,12 +52,14 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Adler32;
import java.io.OutputStream;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.api.AlertGenerator;
import org.opends.server.api.Backend;
import org.opends.server.api.DirectoryThread;
@@ -77,6 +79,9 @@
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
@@ -89,6 +94,7 @@
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
@@ -100,6 +106,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.RDN;
import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
@@ -172,6 +179,9 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private long generationId = -1;
  private long rejectedGenerationId = -1;
  private boolean requestedResetSinceLastStart = false;
  /**
   * This object is used to store the list of update currently being
@@ -224,7 +234,7 @@
  private int window = 100;
  /**
   * The isoalation policy that this domain is going to use.
   * The isolation policy that this domain is going to use.
   * This field describes the behavior of the domain when an update is
   * attempted and the domain could not connect to any Replication Server.
   * Possible values are accept-updates or deny-updates, but other values
@@ -254,17 +264,20 @@
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry left to be processed
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    boolean checksumOutput = false;
    // The exception raised when any
    DirectoryException exception = null;
    long checksumOutputValue = (long)0;
    /**
     * Initializes the counters of the task with the provider value.
     * Initializes the import/export counters with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initTaskCounters(long count)
    public void initImportExportCounters(long count)
    {
      entryCount = count;
      entryLeftCount = count;
@@ -288,7 +301,7 @@
     * Update the counters of the task for each entry processed during
     * an import or export.
     */
    public void updateTaskCounters()
    public void updateCounters()
    {
      entryLeftCount--;
@@ -342,12 +355,12 @@
    configDn = configuration.dn();
    /*
    * Modify conflicts are solved for all suffixes but the schema suffix
    * because we don't want to store extra information in the schema
    * ldif files.
    * This has no negative impact because the changes on schema should
    * not produce conflicts.
    */
     * Modify conflicts are solved for all suffixes but the schema suffix
     * because we don't want to store extra information in the schema
     * ldif files.
     * This has no negative impact because the changes on schema should
     * not produce conflicts.
     */
    if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
    {
      solveConflictFlag = false;
@@ -370,27 +383,33 @@
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    backend = retrievesBackend(baseDN);
    if (backend == null)
    {
      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
                                  baseDN.toNormalizedString()));
    }
    try
    {
      generationId = loadGenerationId();
    }
    catch (DirectoryException e)
    {
      logError(ERR_LOADING_GENERATION_ID.get(
          baseDN.toNormalizedString(), e.getLocalizedMessage()));
    }
    /*
     * create the broker object used to publish and receive changes
     */
    broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
        maxReceiveDelay, maxSendQueue, maxSendDelay, window,
        heartbeatInterval, new ReplSessionSecurity(configuration));
        heartbeatInterval, generationId,
        new ReplSessionSecurity(configuration));
    broker.start(replicationServers);
    // Retrieves the related backend and its config entry
    try
    {
      retrievesBackendInfos(baseDN);
    } catch (DirectoryException e)
    {
      // The backend associated to this suffix is not able to
      // perform export and import.
      // The replication can continue but this replicationDomain
      // won't be able to use total update.
    }
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
@@ -558,7 +577,7 @@
   * If not set the ResultCode and the response message,
   * interrupt the operation, and return false
   *
   * @param   op  The Operation that needs to be checked.
   * @param   Operation  The Operation that needs to be checked.
   *
   * @return  true when it OK to process the Operation, false otherwise.
   *          When false is returned the resultCode and the reponse message
@@ -798,7 +817,11 @@
              // The server is in the shutdown process
              return null;
            }
            log(Message.raw("Broker received message :" + msg));
            if (debugEnabled())
              if (!(msg instanceof HeartbeatMessage))
                TRACER.debugInfo("Message received <" + msg + ">");
            if (msg instanceof AckMessage)
            {
              AckMessage ack = (AckMessage) msg;
@@ -808,7 +831,7 @@
            {
              // Another server requests us to provide entries
              // for a total update
              initMsg = (InitializeRequestMessage) msg;
              initMsg = (InitializeRequestMessage)msg;
            }
            else if (msg instanceof InitializeTargetMessage)
            {
@@ -822,20 +845,19 @@
                // bunch of entries from the remote server and we
                // want the import thread to catch them and
                // not the ListenerThread.
                importBackend(importMsg);
                initialize(importMsg);
              }
              catch(DirectoryException de)
              {
                // Returns an error message to notify the sender
                ErrorMessage errorMsg =
                  new ErrorMessage(importMsg.getsenderID(),
                                   de.getMessageObject());
                MessageBuilder mb = new MessageBuilder();
                mb.append(de.getMessageObject());
                mb.append("Backend ID: ");
                mb.append(backend.getBackendID());
                log(mb.toMessage());
                // Return an error message to notify the sender
                ErrorMessage errorMsg =
                  new ErrorMessage(importMsg.getsenderID(),
                                   de.getMessageObject());
                TRACER.debugInfo(Message.toString(mb.toMessage()));
                broker.publish(errorMsg);
              }
            }
@@ -850,6 +872,39 @@
                //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
              else
              {
                /* We can receive an error message from the replication server
                 * in the following cases :
                 * - we connected with an incorrect generation id
                 */
                ErrorMessage errorMsg = (ErrorMessage)msg;
                logError(ERR_ERROR_MSG_RECEIVED.get(
                           errorMsg.getDetails()));
                if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId())
                {
                  TRACER.debugInfo("requestedResetSinceLastStart=" +
                             requestedResetSinceLastStart +
                            "rejectedGenerationId=" + rejectedGenerationId);
                  if (requestedResetSinceLastStart && (rejectedGenerationId>0))
                  {
                    // When the last generation presented was refused and we are
                    // the 'reseter' server then restart automatically to become
                    // the 'master'
                    state.clear();
                    rejectedGenerationId = -1;
                    requestedResetSinceLastStart = false;
                    broker.stop();
                    broker.start(replicationServers);
                  }
                }
                if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId())
                {
                  rejectedGenerationId = generationId;
                }
              }
            }
            else if (msg instanceof UpdateMessage)
            {
@@ -875,7 +930,7 @@
        {
          try
          {
            initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
                null);
          }
          catch(DirectoryException de)
@@ -2100,7 +2155,7 @@
  public void disable()
  {
    state.save();
    state.clear();
    state.clearInMemory();
    disabled = true;
    //  stop the listener threads
    for (ListenerThread thread : synchroThreads)
@@ -2126,20 +2181,252 @@
   * The domain will connect back to a replication Server and
   * will recreate threads to listen for messages from the Sycnhronization
   * server.
   * The generationId will be retrieved or computed if necessary.
   * The ServerState will also be read again from the local database.
   */
  public void enable()
  {
    state.clear();
    state.clearInMemory();
    state.loadState();
    disabled = false;
    try
    {
      generationId = loadGenerationId();
    }
    catch (Exception e)
    {
      /* TODO should mark that replicationServer service is
       * not available, log an error and retry upon timeout
       * should we stop the modifications ?
       */
      logError(ERR_LOADING_GENERATION_ID.get(
          baseDN.toNormalizedString(), e.getLocalizedMessage()));
      return;
    }
    // After an on-line import, the value of the generationId is new
    // and it is necessary for the broker to send this new value as part
    // of the serverStart message.
    broker.setGenerationId(generationId);
    broker.start(replicationServers);
    createListeners();
  }
  /**
   * Compute the data generationId associated with the current data present
   * in the backend for this domain.
   * @return The computed generationId.
   * @throws DirectoryException When an error occurs.
   */
  public long computeGenerationId() throws DirectoryException
  {
    long bec = backend.getEntryCount();
    if (bec<0)
      backend = this.retrievesBackend(baseDN);
    bec = backend.getEntryCount();
    this.acquireIEContext();
    ieContext.checksumOutput = true;
    ieContext.entryCount = (bec<1000?bec:1000);
    ieContext.entryLeftCount = ieContext.entryCount;
    exportBackend();
    long genId = ieContext.checksumOutputValue;
    if (debugEnabled())
      TRACER.debugInfo("Computed generationId: #entries=" + bec +
               " generationId=" + ieContext.checksumOutputValue);
    ieContext.checksumOutput = false;
    this.releaseIEContext();
    return genId;
  }
  /**
   * Returns the generationId set for this domain.
   *
   * @return The generationId.
   */
  public long getGenerationId()
  {
    return generationId;
  }
  /**
   * The attribute name used to store the state in the backend.
   */
  protected static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  /**
   * Stores the value of the generationId.
   * @param generationId The value of the generationId.
   * @return a ResultCode indicating if the method was successfull.
   */
  public ResultCode saveGenerationId(long generationId)
  {
    // The generationId is stored in the root entry of the domain.
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
    ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
    ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
    values.add(value);
    LDAPAttribute attr =
      new LDAPAttribute(REPLICATION_GENERATION_ID, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
    mods.add(mod);
    ModifyOperationBasis op =
      new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(),
          new ArrayList<Control>(0), asn1BaseDn,
          mods);
    op.setInternalOperation(true);
    op.setSynchronizationOperation(true);
    op.setDontSynchronize(true);
    op.run();
    ResultCode result = op.getResultCode();
    if (result != ResultCode.SUCCESS)
    {
      Message message = ERR_UPDATING_GENERATION_ID.get(
                          op.getResultCode().getResultCodeName() + " " +
                          op.getErrorMessage(),
                          baseDN.toString());
      logError(message);
    }
    return result;
  }
  /**
   * Load the GenerationId from the root entry of the domain
   * from the REPLICATION_GENERATION_ID attribute in database
   * to memory, or compute it if not found.
   *
   * @return generationId The retrieved value of generationId
   * @throws DirectoryException When an error occurs.
   */
  public long loadGenerationId()
  throws DirectoryException
  {
    long generationId=-1;
    if (debugEnabled())
      TRACER.debugInfo(
          "Attempt to read generation ID from DB " + baseDN.toString());
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
    boolean found = false;
    LDAPFilter filter;
    try
    {
      filter = LDAPFilter.decode("objectclass=*");
    }
    catch (LDAPException e)
    {
      // can not happen
      return -1;
    }
    /*
     * Search the database entry that is used to periodically
     * save the ServerState
     */
    InternalSearchOperation search = null;
    LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
    attributes.add(REPLICATION_GENERATION_ID);
    search = conn.processSearch(asn1BaseDn,
        SearchScope.BASE_OBJECT,
        DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
        filter,attributes);
    if (((search.getResultCode() != ResultCode.SUCCESS)) &&
        ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
    {
      Message message = ERR_SEARCHING_GENERATION_ID.get(
          search.getResultCode().getResultCodeName() + " " +
          search.getErrorMessage(),
          baseDN.toString());
      logError(message);
    }
    SearchResultEntry resultEntry = null;
    if (search.getResultCode() == ResultCode.SUCCESS)
    {
      LinkedList<SearchResultEntry> result = search.getSearchEntries();
      resultEntry = result.getFirst();
      if (resultEntry != null)
      {
        AttributeType synchronizationGenIDType =
          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
        List<Attribute> attrs =
          resultEntry.getAttribute(synchronizationGenIDType);
        if (attrs != null)
        {
          Attribute attr = attrs.get(0);
          LinkedHashSet<AttributeValue> values = attr.getValues();
          if (values.size()!=1)
          {
            Message message = ERR_LOADING_GENERATION_ID.get(
                baseDN.toString(), "#Values != 1");
            logError(message);
          }
          else
          {
            found=true;
            try
            {
              generationId = Long.decode(values.iterator().next().
                  getStringValue());
            }
            catch(Exception e)
            {
              Message message = ERR_LOADING_GENERATION_ID.get(
                baseDN.toString(), e.getLocalizedMessage());
              logError(message);
            }
          }
        }
      }
    }
    if (!found)
    {
      generationId = computeGenerationId();
      saveGenerationId(generationId);
      if (debugEnabled())
        TRACER.debugInfo("Generation ID created for domain base DN=" +
            baseDN.toString() +
            " generationId=" + generationId);
    }
    else
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "Generation ID successfully read from domain base DN=" + baseDN +
            " generationId=" + generationId);
    }
    return generationId;
  }
  /**
   * Reset the generationId of this domain in the whole topology.
   * A message is sent to the Replication Servers for them to reset
   * their change dbs.
   */
  public void resetGenerationId()
  {
    requestedResetSinceLastStart = true;
    ResetGenerationId genIdMessage = new ResetGenerationId();
    broker.publish(genIdMessage);
  }
  /**
   * Do whatever is needed when a backup is started.
   * We need to make sure that the serverState is correclty save.
   */
@@ -2175,18 +2462,20 @@
      {
        msg = broker.receive();
        if (debugEnabled())
          TRACER.debugInfo("Import: EntryBytes received " + msg);
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        log(Message.raw("receiveEntryBytes: received " + msg));
        if (msg instanceof EntryMessage)
        {
          // FIXME
          EntryMessage entryMsg = (EntryMessage)msg;
          byte[] entryBytes = entryMsg.getEntryBytes().clone();
          ieContext.updateTaskCounters();
          ieContext.updateCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
@@ -2202,8 +2491,9 @@
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ieContext.exception = new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails());
          ieContext.exception = new DirectoryException(
                                      ResultCode.OTHER,
                                      errorMsg.getDetails());
          return null;
        }
        else
@@ -2213,9 +2503,10 @@
      }
      catch(Exception e)
      {
        // TODO: i18n
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            Message.raw("received an unexpected message type"), e);
        return null;
            Message.raw("received an unexpected message type" +
                e.getLocalizedMessage()));
      }
    }
  }
@@ -2299,26 +2590,17 @@
  }
  /**
   * Log debug message.
   * @param message The message to log.
   * Export the entries from the backend.
   * The ieContext must have been set before calling.
   *
   * @throws DirectoryException when an error occured
   */
  private void log(Message message)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("DebugInfo" + message);
    }
  }
  /**
   * Export the entries.
   * @throws DirectoryException when an error occurred
   */
  protected void exportBackend() throws DirectoryException
  protected void exportBackend()
  throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    backend = retrievesBackend(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
@@ -2344,13 +2626,53 @@
          ResultCode.OTHER, message, null);
    }
    ReplLDIFOutputStream os = new ReplLDIFOutputStream(this);
    OutputStream os;
    ReplLDIFOutputStream ros;
    if (ieContext.checksumOutput)
    {
      ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
      os = new CheckedOutputStream(ros, new Adler32());
      try
      {
        os.write((Long.toString(ieContext.entryCount)).getBytes());
      }
      catch(Exception e)
      {
        // Should never happen
      }
    }
    else
    {
      ros = new ReplLDIFOutputStream(this, (short)-1);
      os = ros;
    }
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    // baseDN branch is the only one included in the export
    List<DN> includeBranches = new ArrayList<DN>(1);
    includeBranches.add(this.baseDN);
    exportConfig.setIncludeBranches(includeBranches);
    // For the checksum computing mode, only consider the 'stable' attributes
    if (ieContext.checksumOutput)
    {
      String includeAttributeStrings[] =
        {"objectclass", "sn", "cn", "entryuuid"};
      HashSet<AttributeType> includeAttributes;
      includeAttributes = new HashSet<AttributeType>();
      for (String attrName : includeAttributeStrings)
      {
        AttributeType attrType  = DirectoryServer.getAttributeType(attrName);
        if (attrType == null)
        {
          attrType = DirectoryServer.getDefaultAttributeType(attrName);
        }
        includeAttributes.add(attrType);
      }
      exportConfig.setIncludeAttributes(includeAttributes);
    }
    //  Launch the export.
    try
    {
@@ -2374,8 +2696,19 @@
    }
    finally
    {
      //  Clean up after the export by closing the export config.
      exportConfig.close();
      if ((ieContext != null) && (ieContext.checksumOutput))
      {
        ieContext.checksumOutputValue =
         ((CheckedOutputStream)os).getChecksum().getValue();
      }
      else
      {
        // Clean up after the export by closing the export config.
        // Will also flush the export and export the remaining entries.
        // This is a real export where writer has been initialized.
        exportConfig.close();
      }
      //  Release the shared lock on the backend.
      try
@@ -2403,54 +2736,26 @@
  }
  /**
   * Retrieves the backend object related to the domain and the backend's
   * config entry. They will be used for import and export.
   * TODO This should be in a shared package rather than here.
   * Retrieves the backend related to the domain.
   *
   * @return The backend of that domain.
   * @param baseDN The baseDN to retrieve the backend
   * @throws DirectoryException when an error occired
   */
  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
  protected Backend retrievesBackend(DN baseDN)
  {
    // Retrieves the backend related to this domain
    Backend domainBackend = DirectoryServer.getBackend(baseDN);
    if (domainBackend == null)
    {
      Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, "");
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
    }
    // Retrieves its configuration
    BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend);
    if (backendCfg == null)
    {
      Message message =
          ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get();
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
    }
    this.backend = domainBackend;
    if (! domainBackend.supportsLDIFImport())
    {
      Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get(
              String.valueOf(baseDN));
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
    }
    return DirectoryServer.getBackend(baseDN);
  }
  /**
   * Sends lDIFEntry entry lines to the export target currently set.
   * Exports an entry in LDIF format.
   *
   * @param lDIFEntry The lines for the LDIF entry.
   * @param  lDIFEntry The entry to be exported..
   *
   * @throws IOException when an error occurred.
   */
  public void sendEntryLines(String lDIFEntry) throws IOException
  public void exportLDIFEntry(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // we just let down the export.
@@ -2461,12 +2766,14 @@
      throw ioe;
    }
    // new entry then send the current one
    EntryMessage entryMessage = new EntryMessage(
    if (ieContext.checksumOutput == false)
    {
      // Actually send the entry
      EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    ieContext.updateTaskCounters();
      broker.publish(entryMessage);
    }
    ieContext.updateCounters();
  }
  /**
@@ -2477,9 +2784,11 @@
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initialize(short source, Task initTask)
  public void initializeFromRemote(short source, Task initTask)
  throws DirectoryException
  {
    // TRACER.debugInfo("Entering initializeFromRemote");
    acquireIEContext();
    ieContext.initializeTask = initTask;
@@ -2495,13 +2804,14 @@
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representaing the source
   * @param sourceString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    TRACER.debugInfo("Entering decodeSource");
    short  source = 0;
    Throwable cause = null;
    try
@@ -2512,8 +2822,6 @@
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        log(Message.raw("Source decoded for import:" + source));
        return source;
      }
    }
@@ -2525,11 +2833,15 @@
    ResultCode resultCode = ResultCode.OTHER;
    Message message = ERR_INVALID_IMPORT_SOURCE.get();
    if (cause != null)
    {
      throw new DirectoryException(
          resultCode, message, cause);
    }
    else
    {
      throw new DirectoryException(
          resultCode, message);
    }
  }
  /**
@@ -2600,57 +2912,65 @@
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, Task initTask)
  public void initializeRemote(short target, Task initTask)
  throws DirectoryException
  {
    initializeTarget(target, serverId, initTask);
    initializeRemote(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization has been
   * initiated by another server than this one.
   * specified by the target argument when this initialization specifying the
   * server that requests the initialization.
   *
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, short requestorID, Task initTask)
  public void initializeRemote(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    acquireIEContext();
    ieContext.exportTarget = target;
    if (initTask != null)
    {
      ieContext.initializeTask = initTask;
      ieContext.initTaskCounters(backend.getEntryCount());
    }
    // Send start message to the peer
    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
        baseDN, serverId, ieContext.exportTarget, requestorID,
        backend.getEntryCount());
    log(Message.raw("SD : publishes " + initializeMessage +
        " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount));
    broker.publish(initializeMessage);
    try
    {
      // FIXME Temporary workaround - will probably be fixed when implementing
      // dynamic config
      backend = retrievesBackend(this.baseDN);
      if (!backend.supportsLDIFExport())
      {
        Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
                            backend.getBackendID().toString());
        logError(message);
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      acquireIEContext();
      ieContext.exportTarget = target;
      if (initTask != null)
      {
        ieContext.initializeTask = initTask;
        ieContext.initImportExportCounters(backend.getEntryCount());
      }
      // Send start message to the peer
      InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
          baseDN, serverId, ieContext.exportTarget, requestorID,
          backend.getEntryCount());
      broker.publish(initializeMessage);
      exportBackend();
      // Notify the peer of the success
      DoneMessage doneMsg = new DoneMessage(serverId,
        initializeMessage.getDestination());
          initializeMessage.getDestination());
      broker.publish(doneMsg);
      releaseIEContext();
@@ -2658,7 +2978,9 @@
    catch(DirectoryException de)
    {
      // Notify the peer of the failure
      ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject());
      ErrorMessage errorMsg =
        new ErrorMessage(target,
                         de.getMessageObject());
      broker.publish(errorMsg);
      releaseIEContext();
@@ -2686,8 +3008,9 @@
    StringBuilder failureReason = new StringBuilder();
    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
    {
      Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get(
          backend.getBackendID(), String.valueOf(failureReason));
      Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
                          backend.getBackendID(),
                          String.valueOf(failureReason));
      logError(message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
@@ -2698,14 +3021,22 @@
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void importBackend(InitializeTargetMessage initializeMessage)
  protected void initialize(InitializeTargetMessage initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    if (!backend.supportsLDIFImport())
    {
      Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
                          backend.getBackendID().toString());
      logError(message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    try
    {
      log(Message.raw("startImport"));
      if (initializeMessage.getRequestorID() == serverId)
      {
        // The import responds to a request we did so the IEContext
@@ -2718,7 +3049,7 @@
      ieContext.importSource = initializeMessage.getsenderID();
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initTaskCounters(initializeMessage.getEntryCount());
      ieContext.initImportExportCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend);
@@ -2737,20 +3068,14 @@
      // Process import
      this.backend.importLDIF(importConfig);
      TRACER.debugInfo("The import has ended successfully.");
      stateSavingDisabled = false;
      // Re-exchange state with SS
      broker.stop();
      broker.start(replicationServers);
    }
    catch(Exception e)
    {
      DirectoryException de =
        new DirectoryException(
            ResultCode.OTHER, Message.raw(e.getLocalizedMessage()));
      ieContext.exception = de;
      throw (de);
      de = new DirectoryException(ResultCode.OTHER,
                                  Message.raw(e.getLocalizedMessage()));
    }
    finally
    {
@@ -2766,12 +3091,33 @@
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
      }
      releaseIEContext();
      log(Message.raw("End importBackend"));
      // Retrieves the generation ID associated with the data imported
      try
      {
        generationId = loadGenerationId();
      }
      catch (DirectoryException e)
      {
        logError(ERR_LOADING_GENERATION_ID.get(
            baseDN.toNormalizedString(),
            e.getLocalizedMessage()));
      }
      rejectedGenerationId = -1;
      if (debugEnabled())
        TRACER.debugInfo(
            "After import, the replication plugin restarts connections" +
            " to all RSs to provide new generation ID=" + generationId);
      broker.setGenerationId(generationId);
      // Re-exchange generationID and state with RS
      broker.reStart();
    }
    // Success
    // Sends up the root error.
    if (de != null)
      throw de;
  }
  /**
@@ -2794,7 +3140,6 @@
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    // FIXME setBackendEnabled should be part taskUtils ?
    TaskUtils.enableBackend(backend.getBackendID());
  }
@@ -2988,6 +3333,16 @@
  }
  /**
   * Check if the domain is connected to a ReplicationServer.
   *
   * @return true if the server is connected, false if not.
   */
  public boolean isConnected()
  {
    return broker.isConnected();
  }
  /**
   * Determine whether the connection to the replication server is encrypted.
   * @return true if the connection is encrypted, false otherwise.
   */