From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |  673 ++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 514 insertions(+), 159 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 788f72d..0724851 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -27,8 +27,6 @@
 package org.opends.server.replication.plugin;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
-
-import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import org.opends.server.loggers.debug.DebugTracer;
@@ -38,12 +36,14 @@
 import static org.opends.server.replication.protocol.OperationContext.*;
 import static org.opends.server.util.StaticUtils.createEntry;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import org.opends.server.protocols.asn1.ASN1OctetString;
 import static org.opends.server.util.ServerConstants.*;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -52,12 +52,14 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CheckedOutputStream;
 import java.util.zip.DataFormatException;
+import java.util.zip.Adler32;
+import java.io.OutputStream;
 
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*;
 import org.opends.server.admin.std.server.MultimasterDomainCfg;
-import org.opends.server.admin.std.server.BackendCfg;
 import org.opends.server.api.AlertGenerator;
 import org.opends.server.api.Backend;
 import org.opends.server.api.DirectoryThread;
@@ -77,6 +79,9 @@
 import org.opends.server.protocols.asn1.ASN1Exception;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPAttribute;
+import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.protocols.ldap.LDAPModification;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.common.ServerState;
@@ -89,6 +94,7 @@
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.Control;
 import org.opends.server.types.DN;
 import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.DirectoryException;
@@ -100,6 +106,7 @@
 import org.opends.server.types.ModificationType;
 import org.opends.server.types.Operation;
 import org.opends.server.types.RDN;
+import org.opends.server.types.RawModification;
 import org.opends.server.types.ResultCode;
 import org.opends.server.types.SearchFilter;
 import org.opends.server.types.SearchResultEntry;
@@ -172,6 +179,9 @@
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
   private int maxSendDelay = 0;
+  private long generationId = -1;
+  private long rejectedGenerationId = -1;
+  private boolean requestedResetSinceLastStart = false;
 
   /**
    * This object is used to store the list of update currently being
@@ -224,7 +234,7 @@
   private int window = 100;
 
   /**
-   * The isoalation policy that this domain is going to use.
+   * The isolation policy that this domain is going to use.
    * This field describes the behavior of the domain when an update is
    * attempted and the domain could not connect to any Replication Server.
    * Possible values are accept-updates or deny-updates, but other values
@@ -254,17 +264,20 @@
 
     // The total entry count expected to be processed
     long entryCount = 0;
-    // The count for the entry left to be processed
+    // The count for the entry not yet processed
     long entryLeftCount = 0;
 
+    boolean checksumOutput = false;
+
     // The exception raised when any
     DirectoryException exception = null;
+    long checksumOutputValue = (long)0;
 
     /**
-     * Initializes the counters of the task with the provider value.
+     * Initializes the import/export counters with the provider value.
      * @param count The value with which to initialize the counters.
      */
-    public void initTaskCounters(long count)
+    public void initImportExportCounters(long count)
     {
       entryCount = count;
       entryLeftCount = count;
@@ -288,7 +301,7 @@
      * Update the counters of the task for each entry processed during
      * an import or export.
      */
-    public void updateTaskCounters()
+    public void updateCounters()
     {
       entryLeftCount--;
 
@@ -342,12 +355,12 @@
     configDn = configuration.dn();
 
     /*
-    * Modify conflicts are solved for all suffixes but the schema suffix
-    * because we don't want to store extra information in the schema
-    * ldif files.
-    * This has no negative impact because the changes on schema should
-    * not produce conflicts.
-    */
+     * Modify conflicts are solved for all suffixes but the schema suffix
+     * because we don't want to store extra information in the schema
+     * ldif files.
+     * This has no negative impact because the changes on schema should
+     * not produce conflicts.
+     */
     if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
     {
       solveConflictFlag = false;
@@ -370,27 +383,33 @@
     monitor = new ReplicationMonitor(this);
     DirectoryServer.registerMonitorProvider(monitor);
 
+    backend = retrievesBackend(baseDN);
+    if (backend == null)
+    {
+      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
+                                  baseDN.toNormalizedString()));
+    }
+
+    try
+    {
+      generationId = loadGenerationId();
+    }
+    catch (DirectoryException e)
+    {
+      logError(ERR_LOADING_GENERATION_ID.get(
+          baseDN.toNormalizedString(), e.getLocalizedMessage()));
+    }
+
     /*
      * create the broker object used to publish and receive changes
      */
     broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
         maxReceiveDelay, maxSendQueue, maxSendDelay, window,
-        heartbeatInterval, new ReplSessionSecurity(configuration));
+        heartbeatInterval, generationId,
+        new ReplSessionSecurity(configuration));
 
     broker.start(replicationServers);
 
-    // Retrieves the related backend and its config entry
-    try
-    {
-      retrievesBackendInfos(baseDN);
-    } catch (DirectoryException e)
-    {
-      // The backend associated to this suffix is not able to
-      // perform export and import.
-      // The replication can continue but this replicationDomain
-      // won't be able to use total update.
-    }
-
     /*
      * ChangeNumberGenerator is used to create new unique ChangeNumbers
      * for each operation done on the replication domain.
@@ -558,7 +577,7 @@
    * If not set the ResultCode and the response message,
    * interrupt the operation, and return false
    *
-   * @param   op  The Operation that needs to be checked.
+   * @param   Operation  The Operation that needs to be checked.
    *
    * @return  true when it OK to process the Operation, false otherwise.
    *          When false is returned the resultCode and the reponse message
@@ -798,7 +817,11 @@
               // The server is in the shutdown process
               return null;
             }
-            log(Message.raw("Broker received message :" + msg));
+
+            if (debugEnabled())
+              if (!(msg instanceof HeartbeatMessage))
+                TRACER.debugInfo("Message received <" + msg + ">");
+
             if (msg instanceof AckMessage)
             {
               AckMessage ack = (AckMessage) msg;
@@ -808,7 +831,7 @@
             {
               // Another server requests us to provide entries
               // for a total update
-              initMsg = (InitializeRequestMessage) msg;
+              initMsg = (InitializeRequestMessage)msg;
             }
             else if (msg instanceof InitializeTargetMessage)
             {
@@ -822,20 +845,19 @@
                 // bunch of entries from the remote server and we
                 // want the import thread to catch them and
                 // not the ListenerThread.
-                importBackend(importMsg);
+                initialize(importMsg);
               }
               catch(DirectoryException de)
               {
+                // Returns an error message to notify the sender
+                ErrorMessage errorMsg =
+                  new ErrorMessage(importMsg.getsenderID(),
+                                   de.getMessageObject());
                 MessageBuilder mb = new MessageBuilder();
                 mb.append(de.getMessageObject());
                 mb.append("Backend ID: ");
                 mb.append(backend.getBackendID());
-                log(mb.toMessage());
-
-                // Return an error message to notify the sender
-                ErrorMessage errorMsg =
-                  new ErrorMessage(importMsg.getsenderID(),
-                                   de.getMessageObject());
+                TRACER.debugInfo(Message.toString(mb.toMessage()));
                 broker.publish(errorMsg);
               }
             }
@@ -850,6 +872,39 @@
                 //  replicationServer did not find any import source.
                 abandonImportExport((ErrorMessage)msg);
               }
+              else
+              {
+                /* We can receive an error message from the replication server
+                 * in the following cases :
+                 * - we connected with an incorrect generation id
+                 */
+                ErrorMessage errorMsg = (ErrorMessage)msg;
+                logError(ERR_ERROR_MSG_RECEIVED.get(
+                           errorMsg.getDetails()));
+
+                if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId())
+                {
+                  TRACER.debugInfo("requestedResetSinceLastStart=" +
+                             requestedResetSinceLastStart +
+                            "rejectedGenerationId=" + rejectedGenerationId);
+
+                  if (requestedResetSinceLastStart && (rejectedGenerationId>0))
+                  {
+                    // When the last generation presented was refused and we are
+                    // the 'reseter' server then restart automatically to become
+                    // the 'master'
+                    state.clear();
+                    rejectedGenerationId = -1;
+                    requestedResetSinceLastStart = false;
+                    broker.stop();
+                    broker.start(replicationServers);
+                  }
+                }
+                if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId())
+                {
+                  rejectedGenerationId = generationId;
+                }
+              }
             }
             else if (msg instanceof UpdateMessage)
             {
@@ -875,7 +930,7 @@
         {
           try
           {
-            initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
                 null);
           }
           catch(DirectoryException de)
@@ -2100,7 +2155,7 @@
   public void disable()
   {
     state.save();
-    state.clear();
+    state.clearInMemory();
     disabled = true;
     //  stop the listener threads
     for (ListenerThread thread : synchroThreads)
@@ -2126,20 +2181,252 @@
    * The domain will connect back to a replication Server and
    * will recreate threads to listen for messages from the Sycnhronization
    * server.
+   * The generationId will be retrieved or computed if necessary.
    * The ServerState will also be read again from the local database.
    */
   public void enable()
   {
-    state.clear();
+    state.clearInMemory();
     state.loadState();
     disabled = false;
 
+
+    try
+    {
+      generationId = loadGenerationId();
+    }
+    catch (Exception e)
+    {
+      /* TODO should mark that replicationServer service is
+       * not available, log an error and retry upon timeout
+       * should we stop the modifications ?
+       */
+      logError(ERR_LOADING_GENERATION_ID.get(
+          baseDN.toNormalizedString(), e.getLocalizedMessage()));
+      return;
+    }
+
+    // After an on-line import, the value of the generationId is new
+    // and it is necessary for the broker to send this new value as part
+    // of the serverStart message.
+    broker.setGenerationId(generationId);
+
     broker.start(replicationServers);
 
     createListeners();
   }
 
   /**
+   * Compute the data generationId associated with the current data present
+   * in the backend for this domain.
+   * @return The computed generationId.
+   * @throws DirectoryException When an error occurs.
+   */
+  public long computeGenerationId() throws DirectoryException
+  {
+    long bec = backend.getEntryCount();
+    if (bec<0)
+      backend = this.retrievesBackend(baseDN);
+    bec = backend.getEntryCount();
+    this.acquireIEContext();
+    ieContext.checksumOutput = true;
+    ieContext.entryCount = (bec<1000?bec:1000);
+    ieContext.entryLeftCount = ieContext.entryCount;
+    exportBackend();
+    long genId = ieContext.checksumOutputValue;
+
+    if (debugEnabled())
+      TRACER.debugInfo("Computed generationId: #entries=" + bec +
+               " generationId=" + ieContext.checksumOutputValue);
+    ieContext.checksumOutput = false;
+    this.releaseIEContext();
+    return genId;
+  }
+
+  /**
+   * Returns the generationId set for this domain.
+   *
+   * @return The generationId.
+   */
+  public long getGenerationId()
+  {
+    return generationId;
+  }
+
+  /**
+   * The attribute name used to store the state in the backend.
+   */
+  protected static final String REPLICATION_GENERATION_ID =
+    "ds-sync-generation-id";
+
+  /**
+   * Stores the value of the generationId.
+   * @param generationId The value of the generationId.
+   * @return a ResultCode indicating if the method was successfull.
+   */
+  public ResultCode saveGenerationId(long generationId)
+  {
+    // The generationId is stored in the root entry of the domain.
+    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+
+    ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
+    ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
+    values.add(value);
+
+    LDAPAttribute attr =
+      new LDAPAttribute(REPLICATION_GENERATION_ID, values);
+    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
+    ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
+    mods.add(mod);
+
+    ModifyOperationBasis op =
+      new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
+          InternalClientConnection.nextMessageID(),
+          new ArrayList<Control>(0), asn1BaseDn,
+          mods);
+    op.setInternalOperation(true);
+    op.setSynchronizationOperation(true);
+    op.setDontSynchronize(true);
+
+    op.run();
+
+    ResultCode result = op.getResultCode();
+    if (result != ResultCode.SUCCESS)
+    {
+      Message message = ERR_UPDATING_GENERATION_ID.get(
+                          op.getResultCode().getResultCodeName() + " " +
+                          op.getErrorMessage(),
+                          baseDN.toString());
+      logError(message);
+    }
+    return result;
+  }
+
+
+  /**
+   * Load the GenerationId from the root entry of the domain
+   * from the REPLICATION_GENERATION_ID attribute in database
+   * to memory, or compute it if not found.
+   *
+   * @return generationId The retrieved value of generationId
+   * @throws DirectoryException When an error occurs.
+   */
+  public long loadGenerationId()
+  throws DirectoryException
+  {
+    long generationId=-1;
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+          "Attempt to read generation ID from DB " + baseDN.toString());
+
+    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+    boolean found = false;
+    LDAPFilter filter;
+    try
+    {
+      filter = LDAPFilter.decode("objectclass=*");
+    }
+    catch (LDAPException e)
+    {
+      // can not happen
+      return -1;
+    }
+
+    /*
+     * Search the database entry that is used to periodically
+     * save the ServerState
+     */
+    InternalSearchOperation search = null;
+    LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
+    attributes.add(REPLICATION_GENERATION_ID);
+    search = conn.processSearch(asn1BaseDn,
+        SearchScope.BASE_OBJECT,
+        DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
+        filter,attributes);
+    if (((search.getResultCode() != ResultCode.SUCCESS)) &&
+        ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
+    {
+      Message message = ERR_SEARCHING_GENERATION_ID.get(
+          search.getResultCode().getResultCodeName() + " " +
+          search.getErrorMessage(),
+          baseDN.toString());
+      logError(message);
+    }
+
+    SearchResultEntry resultEntry = null;
+    if (search.getResultCode() == ResultCode.SUCCESS)
+    {
+      LinkedList<SearchResultEntry> result = search.getSearchEntries();
+      resultEntry = result.getFirst();
+      if (resultEntry != null)
+      {
+        AttributeType synchronizationGenIDType =
+          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
+        List<Attribute> attrs =
+          resultEntry.getAttribute(synchronizationGenIDType);
+        if (attrs != null)
+        {
+          Attribute attr = attrs.get(0);
+          LinkedHashSet<AttributeValue> values = attr.getValues();
+          if (values.size()!=1)
+          {
+            Message message = ERR_LOADING_GENERATION_ID.get(
+                baseDN.toString(), "#Values != 1");
+            logError(message);
+          }
+          else
+          {
+            found=true;
+            try
+            {
+              generationId = Long.decode(values.iterator().next().
+                  getStringValue());
+            }
+            catch(Exception e)
+            {
+              Message message = ERR_LOADING_GENERATION_ID.get(
+                baseDN.toString(), e.getLocalizedMessage());
+              logError(message);
+            }
+          }
+        }
+      }
+    }
+
+    if (!found)
+    {
+      generationId = computeGenerationId();
+      saveGenerationId(generationId);
+
+      if (debugEnabled())
+        TRACER.debugInfo("Generation ID created for domain base DN=" +
+            baseDN.toString() +
+            " generationId=" + generationId);
+    }
+    else
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "Generation ID successfully read from domain base DN=" + baseDN +
+            " generationId=" + generationId);
+    }
+    return generationId;
+  }
+
+  /**
+   * Reset the generationId of this domain in the whole topology.
+   * A message is sent to the Replication Servers for them to reset
+   * their change dbs.
+   */
+  public void resetGenerationId()
+  {
+    requestedResetSinceLastStart = true;
+    ResetGenerationId genIdMessage = new ResetGenerationId();
+    broker.publish(genIdMessage);
+  }
+
+  /**
    * Do whatever is needed when a backup is started.
    * We need to make sure that the serverState is correclty save.
    */
@@ -2175,18 +2462,20 @@
       {
         msg = broker.receive();
 
+        if (debugEnabled())
+          TRACER.debugInfo("Import: EntryBytes received " + msg);
         if (msg == null)
         {
           // The server is in the shutdown process
           return null;
         }
-        log(Message.raw("receiveEntryBytes: received " + msg));
+
         if (msg instanceof EntryMessage)
         {
           // FIXME
           EntryMessage entryMsg = (EntryMessage)msg;
           byte[] entryBytes = entryMsg.getEntryBytes().clone();
-          ieContext.updateTaskCounters();
+          ieContext.updateCounters();
           return entryBytes;
         }
         else if (msg instanceof DoneMessage)
@@ -2202,8 +2491,9 @@
           // The error is stored and the import is ended
           // by returning null
           ErrorMessage errorMsg = (ErrorMessage)msg;
-          ieContext.exception = new DirectoryException(ResultCode.OTHER,
-              errorMsg.getDetails());
+          ieContext.exception = new DirectoryException(
+                                      ResultCode.OTHER,
+                                      errorMsg.getDetails());
           return null;
         }
         else
@@ -2213,9 +2503,10 @@
       }
       catch(Exception e)
       {
+        // TODO: i18n
         ieContext.exception = new DirectoryException(ResultCode.OTHER,
-            Message.raw("received an unexpected message type"), e);
-        return null;
+            Message.raw("received an unexpected message type" +
+                e.getLocalizedMessage()));
       }
     }
   }
@@ -2299,26 +2590,17 @@
   }
 
   /**
-   * Log debug message.
-   * @param message The message to log.
+   * Export the entries from the backend.
+   * The ieContext must have been set before calling.
+   *
+   * @throws DirectoryException when an error occured
    */
-  private void log(Message message)
-  {
-    if (debugEnabled())
-    {
-      TRACER.debugInfo("DebugInfo" + message);
-    }
-  }
-
-  /**
-   * Export the entries.
-   * @throws DirectoryException when an error occurred
-   */
-  protected void exportBackend() throws DirectoryException
+  protected void exportBackend()
+  throws DirectoryException
   {
     // FIXME Temporary workaround - will probably be fixed when implementing
     // dynamic config
-    retrievesBackendInfos(this.baseDN);
+    backend = retrievesBackend(this.baseDN);
 
     //  Acquire a shared lock for the backend.
     try
@@ -2344,13 +2626,53 @@
           ResultCode.OTHER, message, null);
     }
 
-    ReplLDIFOutputStream os = new ReplLDIFOutputStream(this);
+    OutputStream os;
+    ReplLDIFOutputStream ros;
 
+    if (ieContext.checksumOutput)
+    {
+      ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
+      os = new CheckedOutputStream(ros, new Adler32());
+      try
+      {
+        os.write((Long.toString(ieContext.entryCount)).getBytes());
+      }
+      catch(Exception e)
+      {
+        // Should never happen
+      }
+    }
+    else
+    {
+      ros = new ReplLDIFOutputStream(this, (short)-1);
+      os = ros;
+    }
     LDIFExportConfig exportConfig = new LDIFExportConfig(os);
+
+    // baseDN branch is the only one included in the export
     List<DN> includeBranches = new ArrayList<DN>(1);
     includeBranches.add(this.baseDN);
     exportConfig.setIncludeBranches(includeBranches);
 
+    // For the checksum computing mode, only consider the 'stable' attributes
+    if (ieContext.checksumOutput)
+    {
+      String includeAttributeStrings[] =
+        {"objectclass", "sn", "cn", "entryuuid"};
+      HashSet<AttributeType> includeAttributes;
+      includeAttributes = new HashSet<AttributeType>();
+      for (String attrName : includeAttributeStrings)
+      {
+        AttributeType attrType  = DirectoryServer.getAttributeType(attrName);
+        if (attrType == null)
+        {
+          attrType = DirectoryServer.getDefaultAttributeType(attrName);
+        }
+        includeAttributes.add(attrType);
+      }
+      exportConfig.setIncludeAttributes(includeAttributes);
+    }
+
     //  Launch the export.
     try
     {
@@ -2374,8 +2696,19 @@
     }
     finally
     {
-      //  Clean up after the export by closing the export config.
-      exportConfig.close();
+
+      if ((ieContext != null) && (ieContext.checksumOutput))
+      {
+        ieContext.checksumOutputValue =
+         ((CheckedOutputStream)os).getChecksum().getValue();
+      }
+      else
+      {
+        // Clean up after the export by closing the export config.
+        // Will also flush the export and export the remaining entries.
+        // This is a real export where writer has been initialized.
+        exportConfig.close();
+      }
 
       //  Release the shared lock on the backend.
       try
@@ -2403,54 +2736,26 @@
   }
 
   /**
-   * Retrieves the backend object related to the domain and the backend's
-   * config entry. They will be used for import and export.
-   * TODO This should be in a shared package rather than here.
+   * Retrieves the backend related to the domain.
    *
+   * @return The backend of that domain.
    * @param baseDN The baseDN to retrieve the backend
-   * @throws DirectoryException when an error occired
    */
-  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
+  protected Backend retrievesBackend(DN baseDN)
   {
     // Retrieves the backend related to this domain
-    Backend domainBackend = DirectoryServer.getBackend(baseDN);
-    if (domainBackend == null)
-    {
-      Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, "");
-      throw new DirectoryException(
-          ResultCode.OTHER, message, null);
-    }
-
-    // Retrieves its configuration
-    BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend);
-    if (backendCfg == null)
-    {
-      Message message =
-          ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get();
-      logError(message);
-      throw new DirectoryException(
-          ResultCode.OTHER, message, null);
-    }
-
-    this.backend = domainBackend;
-    if (! domainBackend.supportsLDIFImport())
-    {
-      Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get(
-              String.valueOf(baseDN));
-      logError(message);
-      throw new DirectoryException(
-          ResultCode.OTHER, message, null);
-    }
+    return DirectoryServer.getBackend(baseDN);
   }
 
 
   /**
-   * Sends lDIFEntry entry lines to the export target currently set.
+   * Exports an entry in LDIF format.
    *
-   * @param lDIFEntry The lines for the LDIF entry.
+   * @param  lDIFEntry The entry to be exported..
+   *
    * @throws IOException when an error occurred.
    */
-  public void sendEntryLines(String lDIFEntry) throws IOException
+  public void exportLDIFEntry(String lDIFEntry) throws IOException
   {
     // If an error was raised - like receiving an ErrorMessage
     // we just let down the export.
@@ -2461,12 +2766,14 @@
       throw ioe;
     }
 
-    // new entry then send the current one
-    EntryMessage entryMessage = new EntryMessage(
+    if (ieContext.checksumOutput == false)
+    {
+      // Actually send the entry
+      EntryMessage entryMessage = new EntryMessage(
         serverId, ieContext.exportTarget, lDIFEntry.getBytes());
-    broker.publish(entryMessage);
-
-    ieContext.updateTaskCounters();
+      broker.publish(entryMessage);
+    }
+    ieContext.updateCounters();
   }
 
   /**
@@ -2477,9 +2784,11 @@
    *                 and should be updated of its progress.
    * @throws DirectoryException when an error occurs
    */
-  public void initialize(short source, Task initTask)
+  public void initializeFromRemote(short source, Task initTask)
   throws DirectoryException
   {
+    // TRACER.debugInfo("Entering initializeFromRemote");
+
     acquireIEContext();
     ieContext.initializeTask = initTask;
 
@@ -2495,13 +2804,14 @@
   /**
    * Verifies that the given string represents a valid source
    * from which this server can be initialized.
-   * @param sourceString The string representaing the source
+   * @param sourceString The string representing the source
    * @return The source as a short value
    * @throws DirectoryException if the string is not valid
    */
   public short decodeSource(String sourceString)
   throws DirectoryException
   {
+    TRACER.debugInfo("Entering decodeSource");
     short  source = 0;
     Throwable cause = null;
     try
@@ -2512,8 +2822,6 @@
         // TODO Verifies serverID is in the domain
         // We shold check here that this is a server implied
         // in the current domain.
-
-        log(Message.raw("Source decoded for import:" + source));
         return source;
       }
     }
@@ -2525,11 +2833,15 @@
     ResultCode resultCode = ResultCode.OTHER;
     Message message = ERR_INVALID_IMPORT_SOURCE.get();
     if (cause != null)
+    {
       throw new DirectoryException(
           resultCode, message, cause);
+    }
     else
+    {
       throw new DirectoryException(
           resultCode, message);
+    }
   }
 
   /**
@@ -2600,57 +2912,65 @@
    * @param target The target that should be initialized
    * @param initTask The task that triggers this initialization and that should
    *                 be updated with its progress.
+   *
    * @exception DirectoryException When an error occurs.
    */
-  public void initializeTarget(short target, Task initTask)
+  public void initializeRemote(short target, Task initTask)
   throws DirectoryException
   {
-    initializeTarget(target, serverId, initTask);
+    initializeRemote(target, serverId, initTask);
   }
 
   /**
    * Process the initialization of some other server or servers in the topology
-   * specified by the target argument when this initialization has been
-   * initiated by another server than this one.
+   * specified by the target argument when this initialization specifying the
+   * server that requests the initialization.
+   *
    * @param target The target that should be initialized.
    * @param requestorID The server that initiated the export.
    * @param initTask The task that triggers this initialization and that should
    *  be updated with its progress.
+   *
    * @exception DirectoryException When an error occurs.
    */
-  public void initializeTarget(short target, short requestorID, Task initTask)
+  public void initializeRemote(short target, short requestorID, Task initTask)
   throws DirectoryException
   {
-    // FIXME Temporary workaround - will probably be fixed when implementing
-    // dynamic config
-    retrievesBackendInfos(this.baseDN);
-
-    acquireIEContext();
-
-    ieContext.exportTarget = target;
-    if (initTask != null)
-    {
-      ieContext.initializeTask = initTask;
-      ieContext.initTaskCounters(backend.getEntryCount());
-    }
-
-    // Send start message to the peer
-    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
-        baseDN, serverId, ieContext.exportTarget, requestorID,
-        backend.getEntryCount());
-
-    log(Message.raw("SD : publishes " + initializeMessage +
-        " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount));
-
-    broker.publish(initializeMessage);
-
     try
     {
+      // FIXME Temporary workaround - will probably be fixed when implementing
+      // dynamic config
+      backend = retrievesBackend(this.baseDN);
+
+      if (!backend.supportsLDIFExport())
+      {
+        Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
+                            backend.getBackendID().toString());
+        logError(message);
+        throw new DirectoryException(ResultCode.OTHER, message);
+      }
+
+      acquireIEContext();
+
+      ieContext.exportTarget = target;
+      if (initTask != null)
+      {
+        ieContext.initializeTask = initTask;
+        ieContext.initImportExportCounters(backend.getEntryCount());
+      }
+
+      // Send start message to the peer
+      InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
+          baseDN, serverId, ieContext.exportTarget, requestorID,
+          backend.getEntryCount());
+
+      broker.publish(initializeMessage);
+
       exportBackend();
 
       // Notify the peer of the success
       DoneMessage doneMsg = new DoneMessage(serverId,
-        initializeMessage.getDestination());
+          initializeMessage.getDestination());
       broker.publish(doneMsg);
 
       releaseIEContext();
@@ -2658,7 +2978,9 @@
     catch(DirectoryException de)
     {
       // Notify the peer of the failure
-      ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject());
+      ErrorMessage errorMsg =
+        new ErrorMessage(target,
+                         de.getMessageObject());
       broker.publish(errorMsg);
 
       releaseIEContext();
@@ -2686,8 +3008,9 @@
     StringBuilder failureReason = new StringBuilder();
     if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
     {
-      Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get(
-          backend.getBackendID(), String.valueOf(failureReason));
+      Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
+                          backend.getBackendID(),
+                          String.valueOf(failureReason));
       logError(message);
       throw new DirectoryException(ResultCode.OTHER, message);
     }
@@ -2698,14 +3021,22 @@
    * @param initializeMessage The message that initiated the import.
    * @exception DirectoryException Thrown when an error occurs.
    */
-  protected void importBackend(InitializeTargetMessage initializeMessage)
+  protected void initialize(InitializeTargetMessage initializeMessage)
   throws DirectoryException
   {
     LDIFImportConfig importConfig = null;
+    DirectoryException de = null;
+
+    if (!backend.supportsLDIFImport())
+    {
+      Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
+                          backend.getBackendID().toString());
+      logError(message);
+      throw new DirectoryException(ResultCode.OTHER, message);
+    }
+
     try
     {
-      log(Message.raw("startImport"));
-
       if (initializeMessage.getRequestorID() == serverId)
       {
         // The import responds to a request we did so the IEContext
@@ -2718,7 +3049,7 @@
 
       ieContext.importSource = initializeMessage.getsenderID();
       ieContext.entryLeftCount = initializeMessage.getEntryCount();
-      ieContext.initTaskCounters(initializeMessage.getEntryCount());
+      ieContext.initImportExportCounters(initializeMessage.getEntryCount());
 
       preBackendImport(this.backend);
 
@@ -2737,20 +3068,14 @@
       // Process import
       this.backend.importLDIF(importConfig);
 
+      TRACER.debugInfo("The import has ended successfully.");
       stateSavingDisabled = false;
 
-      // Re-exchange state with SS
-      broker.stop();
-      broker.start(replicationServers);
-
     }
     catch(Exception e)
     {
-      DirectoryException de =
-        new DirectoryException(
-            ResultCode.OTHER, Message.raw(e.getLocalizedMessage()));
-      ieContext.exception = de;
-      throw (de);
+      de = new DirectoryException(ResultCode.OTHER,
+                                  Message.raw(e.getLocalizedMessage()));
     }
     finally
     {
@@ -2766,12 +3091,33 @@
         ((InitializeTask)ieContext.initializeTask).
         setState(ieContext.updateTaskCompletionState(),ieContext.exception);
       }
-
       releaseIEContext();
 
-      log(Message.raw("End importBackend"));
+      // Retrieves the generation ID associated with the data imported
+      try
+      {
+        generationId = loadGenerationId();
+      }
+      catch (DirectoryException e)
+      {
+        logError(ERR_LOADING_GENERATION_ID.get(
+            baseDN.toNormalizedString(),
+            e.getLocalizedMessage()));
+      }
+      rejectedGenerationId = -1;
+
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "After import, the replication plugin restarts connections" +
+            " to all RSs to provide new generation ID=" + generationId);
+      broker.setGenerationId(generationId);
+
+      // Re-exchange generationID and state with RS
+      broker.reStart();
     }
-    // Success
+    // Sends up the root error.
+    if (de != null)
+      throw de;
   }
 
   /**
@@ -2794,7 +3140,6 @@
       throw new DirectoryException(ResultCode.OTHER, message);
     }
 
-    // FIXME setBackendEnabled should be part taskUtils ?
     TaskUtils.enableBackend(backend.getBackendID());
   }
 
@@ -2988,6 +3333,16 @@
   }
 
   /**
+   * Check if the domain is connected to a ReplicationServer.
+   *
+   * @return true if the server is connected, false if not.
+   */
+  public boolean isConnected()
+  {
+    return broker.isConnected();
+  }
+
+  /**
    * Determine whether the connection to the replication server is encrypted.
    * @return true if the connection is encrypted, false otherwise.
    */

--
Gitblit v1.10.0