From d90a03781f4833f726597d735b79b49073940d75 Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Fri, 06 Nov 2015 17:46:01 +0000
Subject: [PATCH] OPENDJ-1937 Replication draft change log changeNumber attribute should be synchronized with other RSs during initialization

---
 opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java |  726 +++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 467 insertions(+), 259 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java b/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
index 8c5494f..c612fad 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
@@ -36,7 +36,6 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
@@ -49,6 +48,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -57,13 +57,12 @@
 import javax.naming.NamingEnumeration;
 import javax.naming.NamingException;
 import javax.naming.NoPermissionException;
-import javax.naming.directory.Attribute;
-import javax.naming.directory.BasicAttribute;
 import javax.naming.directory.BasicAttributes;
 import javax.naming.directory.DirContext;
 import javax.naming.directory.SearchControls;
 import javax.naming.directory.SearchResult;
 import javax.naming.ldap.InitialLdapContext;
+import javax.naming.ldap.LdapName;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLHandshakeException;
@@ -112,6 +111,8 @@
 import org.opends.server.tools.tasks.TaskScheduleInteraction;
 import org.opends.server.tools.tasks.TaskScheduleUserData;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.NullOutputStream;
 import org.opends.server.types.OpenDsException;
@@ -147,6 +148,7 @@
 import static com.forgerock.opendj.cli.Utils.*;
 import static com.forgerock.opendj.util.OperatingSystem.*;
 
+import static java.util.Collections.*;
 import static org.forgerock.util.Utils.*;
 import static org.opends.admin.ads.util.ConnectionUtils.*;
 import static org.opends.admin.ads.util.PreferredConnection.*;
@@ -220,6 +222,9 @@
     STATUS(STATUS_REPLICATION_SUBCMD_NAME, INFO_REPLICATION_STATUS_MENU_PROMPT.get()),
     /** Replication purge historical. */
     PURGE_HISTORICAL(PURGE_HISTORICAL_SUBCMD_NAME, INFO_REPLICATION_PURGE_HISTORICAL_MENU_PROMPT.get()),
+    /** Set changelog change number from another server. */
+    RESET_CHANGE_NUMBER(ReplicationCliArgumentParser.RESET_CHANGE_NUMBER_SUBCMD_NAME,
+        INFO_DESCRIPTION_RESET_CHANGE_NUMBER.get()),
     /** Cancel operation. */
     CANCEL(null, null);
 
@@ -256,6 +261,34 @@
     }
   }
 
+  /** Abstract some of the operations when two servers must be queried for information. */
+  private interface OperationBetweenSourceAndDestinationServers
+  {
+    /**
+     * Returns whether we should stop processing after asking the user for additional information.
+     * Might connect to servers to run configuration checks.
+     * @param baseDNs user specified baseDNs
+     * @param source the source server
+     * @param dest the destination server
+     * @param interactive if user has to input information
+     * @return whether we should stop
+     */
+    boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source, InitialLdapContext dest,
+        boolean interactive);
+
+    /**
+     * Confirm with the user whether the current task should continue.
+     *
+     * @param uData servers address and authentication parameters
+     * @param ctxSource LDAP Context for the destination server
+     * @param ctxDestination LDAP Context for the source server
+     * @param defaultValue default yes or no
+     * @return whether the current task should be interrupted
+     */
+    boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
+        InitialLdapContext ctxDestination, final boolean defaultValue);
+  }
+
   /** The argument parser to be used. */
   private ReplicationCliArgumentParser argParser;
   private FileBasedArgument userProvidedAdminPwdFile;
@@ -527,6 +560,8 @@
       return statusReplication();
     case PURGE_HISTORICAL:
       return purgeHistorical();
+    case RESET_CHANGE_NUMBER:
+      return resetChangeNumber();
     default:
       return SUCCESSFUL_NOP;
     }
@@ -543,15 +578,7 @@
    */
   private String askForAdministratorUID(String defaultValue, LocalizedLogger logger)
   {
-    try
-    {
-      return readInput(INFO_ADMINISTRATOR_UID_PROMPT.get(), defaultValue);
-    }
-    catch (ClientException ce)
-    {
-      logger.warn(LocalizableMessage.raw("Error reading input: " + ce, ce));
-      return defaultValue;
-    }
+    return ask(logger, INFO_ADMINISTRATOR_UID_PROMPT.get(), defaultValue);
   }
 
   /**
@@ -574,6 +601,19 @@
     }
   }
 
+  private String ask(LocalizedLogger logger, LocalizableMessage msgPrompt, String defaultValue)
+  {
+    try
+    {
+      return readInput(msgPrompt, defaultValue);
+    }
+    catch (ClientException ce)
+    {
+      logger.warn(LocalizableMessage.raw("Error reading input: " + ce, ce));
+      return defaultValue;
+    }
+  }
+
   /**
    * Commodity method used to repeatidly ask the user to provide an integer
    * value.
@@ -1316,6 +1356,219 @@
     }
   }
 
+  private ReplicationCliReturnCode resetChangeNumber()
+  {
+    final String changeNumber;
+    final SourceDestinationServerUserData uData = new SourceDestinationServerUserData();
+
+    if (!argParser.isInteractive())
+    {
+      initializeWithArgParser(uData);
+      return resetChangeNumber(uData);
+    }
+    OperationBetweenSourceAndDestinationServers
+        resetChangeNumberOperations = new OperationBetweenSourceAndDestinationServers()
+    {
+      @Override
+      public boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source,
+          InitialLdapContext dest, boolean interactive)
+      {
+        TopologyCacheFilter filter = new TopologyCacheFilter();
+        filter.setSearchMonitoringInformation(false);
+
+        if (!argParser.resetChangeNumber.isPresent())
+        {
+          String cn = getNewestChangeNumber(source);
+          if (cn.isEmpty())
+          {
+            return true;
+          }
+          argParser.setResetChangeNumber(
+              ask(logger, INFO_RESET_CHANGE_NUMBER_TO.get(uData.getSource(), uData.getDestination()), cn));
+        }
+        return false;
+      }
+
+      @Override
+      public boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
+          InitialLdapContext ctxDestination, boolean defaultValue)
+      {
+        return !askConfirmation(INFO_RESET_CHANGE_NUMBER_CONFIRM_RESET.get(uData.getDestinationHostPort()),
+            defaultValue);
+      }
+    };
+
+    return promptIfRequired(uData, resetChangeNumberOperations) ? resetChangeNumber(uData) : USER_CANCELLED;
+  }
+
+  private ReplicationCliReturnCode resetChangeNumber(SourceDestinationServerUserData uData)
+  {
+
+    InitialLdapContext ctxSource = createAdministrativeContext(uData, uData.getSource());
+    InitialLdapContext ctxDest = createAdministrativeContext(uData, uData.getDestination());
+    if (!getCommonSuffixes(ctxSource, ctxDest, SuffixRelationType.NOT_FULLY_REPLICATED).isEmpty())
+    {
+      errPrintln(ERROR_RESET_CHANGE_NUMBER_SERVERS_BASEDNS_DIFFER.get(uData.getSourceHostPort(),
+          uData.getDestinationHostPort()));
+      return ERROR_RESET_CHANGE_NUMBER_BASEDNS_SHOULD_EQUAL;
+    }
+    if (mustPrintCommandBuilder())
+    {
+      printNewCommandBuilder(RESET_CHANGE_NUMBER_SUBCMD_NAME, uData);
+    }
+    try
+    {
+      String newStartCN;
+      if (argParser.resetChangeNumber.isPresent())
+      {
+        newStartCN = String.valueOf(argParser.getResetChangeNumber());
+      }
+      else
+      {
+        newStartCN = getNewestChangeNumber(ctxSource);
+        if (newStartCN.isEmpty())
+        {
+          return ERROR_UNKNOWN_CHANGE_NUMBER;
+        }
+        argParser.setResetChangeNumber(newStartCN);
+      }
+      SearchControls ctls = new SearchControls();
+      ctls.setSearchScope(SearchControls.SUBTREE_SCOPE);
+      ctls.setReturningAttributes(
+          new String[] {
+              "changeNumber",
+              "replicationCSN",
+              "targetDN"
+          });
+      NamingEnumeration<SearchResult> listeners = ctxSource.search(new LdapName("cn=changelog"),
+          "(changeNumber=" + newStartCN + ")", ctls);
+      if (!listeners.hasMore())
+      {
+        errPrintln(ERROR_RESET_CHANGE_NUMBER_UNKNOWN_NUMBER.get(newStartCN, uData.getSourceHostPort()));
+        return ERROR_UNKNOWN_CHANGE_NUMBER;
+      }
+      SearchResult sr = listeners.next();
+      String newStartCSN = getFirstValue(sr, "replicationCSN");
+      if (newStartCSN == null)
+      {
+        errPrintln(ERROR_RESET_CHANGE_NUMBER_NO_CSN_FOUND.get(newStartCN, uData.getSourceHostPort()));
+        return ERROR_RESET_CHANGE_NUMBER_NO_CSN;
+      }
+      String targetDN = getFirstValue(sr, "targetDN");
+      DN targetBaseDN = DN.rootDN();
+      try
+      {
+        for (String adn : getCommonSuffixes(ctxSource, ctxDest, SuffixRelationType.REPLICATED))
+        {
+          DN dn = DN.valueOf(adn);
+          if (DN.valueOf(targetDN).isDescendantOf(dn) && dn.isDescendantOf(targetBaseDN))
+          {
+            targetBaseDN = dn;
+          }
+        }
+      }
+      catch (DirectoryException de)
+      {
+        errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(de.getLocalizedMessage()));
+        return ERROR_RESET_CHANGE_NUMBER_PROBLEM;
+      }
+      if (targetBaseDN.isRootDN())
+      {
+        errPrintln(ERROR_RESET_CHANGE_NUMBER_NO_BASEDN.get(newStartCN, targetDN, newStartCSN));
+        return ERROR_RESET_CHANGE_NUMBER_UNKNOWN_BASEDN;
+      }
+      logger.info(INFO_RESET_CHANGE_NUMBER_INFO.get(uData.getDestinationHostPort(),
+          newStartCN, newStartCSN, targetBaseDN.toString()));
+      Map<String, String> taskAttrs = new TreeMap<>();
+      taskAttrs.put("ds-task-reset-change-number-to", newStartCN);
+      taskAttrs.put("ds-task-reset-change-number-csn", newStartCSN);
+      taskAttrs.put("ds-task-reset-change-number-base-dn", targetBaseDN.toString());
+      String taskDN = createServerTask(ctxDest,
+          "ds-task-reset-change-number", "org.opends.server.tasks.ResetChangeNumberTask", "dsreplication-reset-cn",
+          taskAttrs);
+      waitUntilResetChangeNumberTaskEnds(ctxDest, taskDN);
+      return SUCCESSFUL;
+    }
+    catch (ReplicationCliException | NamingException | NullPointerException e)
+    {
+      errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(e.getLocalizedMessage()));
+      return ERROR_RESET_CHANGE_NUMBER_PROBLEM;
+    }
+  }
+
+  private String getNewestChangeNumber(InitialLdapContext source)
+  {
+    try
+    {
+      SearchControls ctls = new SearchControls();
+      ctls.setSearchScope(SearchControls.OBJECT_SCOPE);
+      ctls.setReturningAttributes(new String[] {"lastChangeNumber"});
+      NamingEnumeration<SearchResult> results = source.search(new LdapName(""), "objectclass=*", ctls);
+      if (results.hasMore()) {
+        return getFirstValue(results.next(), "lastChangeNumber");
+      }
+    }
+    catch (NamingException e)
+    {
+      errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(e.getLocalizedMessage()));
+    }
+
+    return "";
+  }
+
+  private void waitUntilResetChangeNumberTaskEnds(InitialLdapContext server, String taskDN)
+      throws ReplicationCliException
+  {
+    String lastLogMsg = null;
+    while (true)
+    {
+      sleepCatchInterrupt(500);
+      try
+      {
+        SearchResult sr = getLastSearchResult(server, taskDN, "ds-task-log-message", "ds-task-state" );
+        String logMsg = getFirstValue(sr, "ds-task-log-message");
+        if (logMsg != null && !logMsg.equals(lastLogMsg))
+        {
+          logger.info(LocalizableMessage.raw(logMsg));
+          lastLogMsg = logMsg;
+        }
+        InstallerHelper helper = new InstallerHelper();
+        String state = getFirstValue(sr, "ds-task-state");
+
+        if (helper.isDone(state) || helper.isStoppedByError(state))
+        {
+          LocalizableMessage errorMsg = ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
+
+          if (helper.isCompletedWithErrors(state))
+          {
+            logger.warn(LocalizableMessage.raw("Completed with error: " + errorMsg));
+            errPrintln(errorMsg);
+          }
+          else if (!helper.isSuccessful(state) || helper.isStoppedByError(state))
+          {
+            logger.warn(LocalizableMessage.raw("Error: " + errorMsg));
+            throw new ReplicationCliException(errorMsg, ERROR_LAUNCHING_RESET_CHANGE_NUMBER, null);
+          }
+          else
+          {
+            print(INFO_RESET_CHANGE_NUMBER_TASK_FINISHED.get());
+            println();
+          }
+          return;
+        }
+      }
+      catch (NameNotFoundException x)
+      {
+        return;
+      }
+      catch (NamingException ne)
+      {
+        throw new ReplicationCliException(getThrowableMsg(ERR_READING_SERVER_TASK_PROGRESS.get(), ne),
+            ERROR_CONNECTING, ne);
+      }
+    }
+  }
+
   private InitialLdapContext createAdministrativeContext(MonoServerReplicationUserData uData)
   {
     final String bindDn = getAdministratorDN(uData.getAdminUid());
@@ -1411,40 +1664,22 @@
             getThrowableMsg(msg, ne), code, ne);
       }
     }
-    // Wait until it is over
-    SearchControls searchControls = new SearchControls();
-    searchControls.setCountLimit(1);
-    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
-    searchControls.setReturningAttributes(
-        new String[] {
-            "ds-task-log-message",
-            "ds-task-state",
-            "ds-task-purge-conflicts-historical-purged-values-count",
-            "ds-task-purge-conflicts-historical-purge-completed-in-time",
-            "ds-task-purge-conflicts-historical-purge-completed-in-time",
-            "ds-task-purge-conflicts-historical-last-purged-changenumber"
-        });
-    String filter = "objectclass=*";
-    String lastLogMsg = null;
 
     // Polling only makes sense when we are recurrently scheduling a task
     // or the task is being executed now.
+    String lastLogMsg = null;
     while (!isOver && uData.getTaskSchedule().getStartDate() == null)
     {
       sleepCatchInterrupt(500);
       try
       {
-        NamingEnumeration<SearchResult> res =
-          ctx.search(dn, filter, searchControls);
-        SearchResult sr = null;
-        try
-        {
-          sr = res.next();
-        }
-        finally
-        {
-          res.close();
-        }
+        SearchResult sr = getFirstSearchResult(ctx, dn,
+            "ds-task-log-message",
+            "ds-task-state",
+            "ds-task-purge-conflicts-historical-purged-values-count",
+            "ds-task-purge-conflicts-historical-purge-completed-in-time",
+            "ds-task-purge-conflicts-historical-purge-completed-in-time",
+            "ds-task-purge-conflicts-historical-last-purged-changenumber");
         String logMsg = getFirstValue(sr, "ds-task-log-message");
         if (logMsg != null && !logMsg.equals(lastLogMsg))
         {
@@ -1479,7 +1714,7 @@
       }
       catch (NamingException ne)
       {
-        LocalizableMessage msg = ERR_POOLING_PURGE_HISTORICAL.get();
+        LocalizableMessage msg = ERR_READING_SERVER_TASK_PROGRESS.get();
         throw new ReplicationCliException(
           getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
       }
@@ -1492,14 +1727,34 @@
     return returnCode;
   }
 
+  private SearchResult getFirstSearchResult(InitialLdapContext ctx, String dn, String... returnedAttributes)
+      throws NamingException
+  {
+    SearchControls searchControls = new SearchControls();
+    searchControls.setCountLimit(1);
+    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
+    searchControls.setReturningAttributes(returnedAttributes);
+    NamingEnumeration<SearchResult> res = ctx.search(dn, "objectclass=*", searchControls);
+    try
+    {
+      SearchResult sr = null;
+      sr = res.next();
+      return sr;
+    }
+    finally
+    {
+      res.close();
+    }
+  }
+
   private LocalizableMessage getPurgeErrorMsg(String lastLogMsg, String state, InitialLdapContext ctx)
   {
     String server = getHostPort(ctx);
     if (lastLogMsg != null)
     {
-      return INFO_ERROR_DURING_PURGE_HISTORICAL_LOG.get(lastLogMsg, state, server);
+      return ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
     }
-    return INFO_ERROR_DURING_PURGE_HISTORICAL_NO_LOG.get(state, server);
+    return ERR_UNEXPECTED_DURING_TASK_NO_LOG.get(state, server);
   }
 
   /**
@@ -1556,7 +1811,7 @@
         }
         replica.setBackendName(backend.getBackendID());
         replica.setSuffix(suffix);
-        suffix.setReplicas(Collections.singleton(replica));
+        suffix.setReplicas(singleton(replica));
 
         replicas.add(replica);
       }
@@ -1685,23 +1940,32 @@
    */
   private ReplicationCliReturnCode initializeReplication()
   {
-    InitializeReplicationUserData uData = new InitializeReplicationUserData();
-    if (argParser.isInteractive())
-    {
-      if (promptIfRequired(uData))
-      {
-        return initializeReplication(uData);
-      }
-      else
-      {
-        return USER_CANCELLED;
-      }
-    }
-    else
+    SourceDestinationServerUserData uData = new SourceDestinationServerUserData();
+    if (!argParser.isInteractive())
     {
       initializeWithArgParser(uData);
       return initializeReplication(uData);
     }
+
+    OperationBetweenSourceAndDestinationServers
+        initializeReplicationOperations = new OperationBetweenSourceAndDestinationServers()
+    {
+      @Override
+      public boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source,
+          InitialLdapContext dest, boolean interactive)
+      {
+        checkSuffixesForInitializeReplication(baseDNs, source, dest, interactive);
+        return baseDNs.isEmpty();
+      }
+
+      @Override
+      public boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
+          InitialLdapContext ctxDestination, boolean defaultValue)
+      {
+        return !askConfirmation(getInitializeReplicationPrompt(uData, ctxSource, ctxDestination), defaultValue);
+      }
+    };
+    return promptIfRequired(uData, initializeReplicationOperations) ? initializeReplication(uData) : USER_CANCELLED;
   }
 
   /**
@@ -2782,7 +3046,7 @@
       {
         if (uData instanceof InitializeAllReplicationUserData)
         {
-          sourceServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
+          sourceServerCI.setHeadingMessage(INFO_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
         }
         sourceServerCI.run();
 
@@ -2871,10 +3135,12 @@
    * is missing, ask the user to provide valid data.
    * We assume that if this method is called we are in interactive mode.
    * @param uData the object to be updated.
+   * @param serversOperations Additional processing for the command
    * @return <CODE>true</CODE> if the object was successfully updated and
    * <CODE>false</CODE> if the user cancelled the operation.
    */
-  private boolean promptIfRequired(InitializeReplicationUserData uData)
+  private boolean promptIfRequired(SourceDestinationServerUserData uData,
+      OperationBetweenSourceAndDestinationServers serversOperations)
   {
     boolean cancelled = false;
 
@@ -2903,7 +3169,7 @@
     {
       try
       {
-        sourceServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
+        sourceServerCI.setHeadingMessage(INFO_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
         sourceServerCI.run();
         hostSource = sourceServerCI.getHostName();
         portSource = sourceServerCI.getPortNumber();
@@ -2966,7 +3232,7 @@
     {
       try
       {
-        destinationServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_DESTINATION_CONNECTION_PARAMETERS.get());
+        destinationServerCI.setHeadingMessage(INFO_INITIALIZE_DESTINATION_CONNECTION_PARAMETERS.get());
         destinationServerCI.run();
         hostDestination = destinationServerCI.getHostName();
         portDestination = destinationServerCI.getPortNumber();
@@ -2977,7 +3243,7 @@
         {
           portDestination = -1;
           errPrintln();
-          errPrintln(ERR_REPLICATION_INITIALIZE_SAME_SERVER_PORT.get(hostSource, portSource));
+          errPrintln(ERR_SOURCE_DESTINATION_INITIALIZE_SAME_SERVER_PORT.get(hostSource, portSource));
           errPrintln();
           error = true;
         }
@@ -3016,17 +3282,14 @@
     if (!cancelled)
     {
       LinkedList<String> suffixes = argParser.getBaseDNs();
-      checkSuffixesForInitializeReplication(suffixes, ctxSource, ctxDestination, true);
-      cancelled = suffixes.isEmpty();
-
+      cancelled = serversOperations.continueAfterUserInput(suffixes, ctxSource, ctxDestination, true);
       uData.setBaseDNs(suffixes);
     }
 
     if (!cancelled)
     {
-      // Ask for confirmation to initialize.
       println();
-      cancelled = !askConfirmation(getPrompt(uData, ctxSource, ctxDestination), true);
+      cancelled = serversOperations.confirmOperation(uData, ctxSource, ctxDestination, true);
       println();
     }
 
@@ -3034,8 +3297,8 @@
     return !cancelled;
   }
 
-  private LocalizableMessage getPrompt(InitializeReplicationUserData uData, InitialLdapContext ctxSource,
-      InitialLdapContext ctxDestination)
+  private LocalizableMessage getInitializeReplicationPrompt(SourceDestinationServerUserData uData,
+      InitialLdapContext ctxSource, InitialLdapContext ctxDestination)
   {
     String hostPortSource = getHostPort(ctxSource);
     String hostPortDestination = getHostPort(ctxDestination);
@@ -3138,7 +3401,7 @@
    * user.
    * @param uData the initialize replication user data object to be initialized.
    */
-  private void initializeWithArgParser(InitializeReplicationUserData uData)
+  private void initializeWithArgParser(SourceDestinationServerUserData uData)
   {
     initialize(uData);
 
@@ -3250,8 +3513,7 @@
     }
     catch (Throwable t)
     {
-      logger.warn(LocalizableMessage.raw(
-          "Unexpected error retrieving the replication port: "+t, t));
+      logger.warn(LocalizableMessage.raw("Unexpected error retrieving the replication port: " + t, t));
     }
     return replicationPort;
   }
@@ -3547,8 +3809,7 @@
    * @return a Collection containing a list of suffixes that are replicated
    * (or those that can be replicated) in two servers.
    */
-  private Collection<String> getCommonSuffixes(
-      InitialLdapContext ctx1, InitialLdapContext ctx2, SuffixRelationType type)
+  private List<String> getCommonSuffixes(InitialLdapContext ctx1, InitialLdapContext ctx2, SuffixRelationType type)
   {
     LinkedList<String> suffixes = new LinkedList<>();
     try
@@ -4005,11 +4266,10 @@
    * @return ReplicationCliReturnCode.SUCCESSFUL if the operation was
    * successful and an error code otherwise.
    */
-  private ReplicationCliReturnCode initializeReplication(
-      InitializeReplicationUserData uData)
+  private ReplicationCliReturnCode initializeReplication(SourceDestinationServerUserData uData)
   {
-    InitialLdapContext ctxSource = createAdministrativeContext(uData, true);
-    InitialLdapContext ctxDestination = createAdministrativeContext(uData, false);
+    InitialLdapContext ctxSource = createAdministrativeContext(uData, uData.getSource());
+    InitialLdapContext ctxDestination = createAdministrativeContext(uData, uData.getDestination());
     try
     {
       if (ctxSource == null || ctxDestination == null)
@@ -4056,22 +4316,19 @@
     }
   }
 
-  private InitialLdapContext createAdministrativeContext(InitializeReplicationUserData uData, boolean isSource)
+  private InitialLdapContext createAdministrativeContext(SourceDestinationServerUserData uData, HostPort server)
   {
-    final String host = isSource ? uData.getHostNameSource() : uData.getHostNameDestination();
-    final int port = isSource ? uData.getPortSource() : uData.getPortDestination();
     try
     {
       return createAdministrativeContext(
-          host, port, useSSL, useStartTLS,
+          server.getHost(), server.getPort(), useSSL, useStartTLS,
           getAdministratorDN(uData.getAdminUid()), uData.getAdminPwd(),
           getConnectTimeout(), getTrustManager(sourceServerCI));
     }
     catch (NamingException ne)
     {
-      final String hostPort = getServerRepresentation(host, port);
       errPrintln();
-      errPrintln(getMessageForException(ne, hostPort));
+      errPrintln(getMessageForException(ne, server.toString()));
       logger.error(LocalizableMessage.raw("Complete error stack:"), ne);
       return null;
     }
@@ -6890,8 +7147,7 @@
       public void progressUpdate(ProgressUpdateEvent ev)
       {
         LocalizableMessage newLogDetails = ev.getNewLogs();
-        if (newLogDetails != null
-            && !"".equals(newLogDetails.toString().trim()))
+        if (newLogDetails != null && !"".equals(newLogDetails.toString().trim()))
         {
           print(newLogDetails);
           println();
@@ -6904,8 +7160,7 @@
     {
       try
       {
-        installer.initializeSuffix(ctxDestination, replicationId, baseDN,
-            displayProgress, getHostPort(ctxSource));
+        installer.initializeSuffix(ctxDestination, replicationId, baseDN, displayProgress, getHostPort(ctxSource));
         initDone = true;
       }
       catch (PeerNotFoundException pnfe)
@@ -7019,81 +7274,37 @@
   private void postPreExternalInitialization(String baseDN,
       InitialLdapContext ctx, boolean isPre) throws ReplicationCliException
   {
-    boolean taskCreated = false;
-    int i = 1;
     boolean isOver = false;
     String dn = null;
-    BasicAttributes attrs = new BasicAttributes();
-    Attribute oc = new BasicAttribute("objectclass");
-    oc.add("top");
-    oc.add("ds-task");
-    oc.add("ds-task-reset-generation-id");
-    attrs.put(oc);
-    attrs.put("ds-task-class-name",
-        "org.opends.server.tasks.SetGenerationIdTask");
+    Map<String, String> attrMap = new TreeMap<>();
     if (isPre)
     {
-      attrs.put("ds-task-reset-generation-id-new-value", "-1");
+      attrMap.put("ds-task-reset-generation-id-new-value", "-1");
     }
-    attrs.put("ds-task-reset-generation-id-domain-base-dn", baseDN);
-    while (!taskCreated)
+    attrMap.put("ds-task-reset-generation-id-domain-base-dn", baseDN);
+
+    try {
+      dn = createServerTask(ctx, "ds-task-reset-generation-id", "org.opends.server.tasks.SetGenerationIdTask",
+          "dsreplication-reset-generation-id", attrMap);
+    }
+    catch (NamingException ne)
     {
-      String id = "dsreplication-reset-generation-id-"+i;
-      dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
-      attrs.put("ds-task-id", id);
-      try
-      {
-        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
-        taskCreated = true;
-        logger.info(LocalizableMessage.raw("created task entry: "+attrs));
-        dirCtx.close();
-      }
-      catch (NameAlreadyBoundException x)
-      {
-      }
-      catch (NamingException ne)
-      {
-        logger.error(LocalizableMessage.raw("Error creating task "+attrs, ne));
-        LocalizableMessage msg = isPre ?
-        ERR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION.get():
+      LocalizableMessage msg = isPre ?
+          ERR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION.get():
           ERR_LAUNCHING_POST_EXTERNAL_INITIALIZATION.get();
-        ReplicationCliReturnCode code = isPre?
-            ERROR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION:
-              ERROR_LAUNCHING_POST_EXTERNAL_INITIALIZATION;
-        throw new ReplicationCliException(
-            getThrowableMsg(msg, ne), code, ne);
-      }
-      i++;
+      ReplicationCliReturnCode code = isPre?
+          ERROR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION:
+          ERROR_LAUNCHING_POST_EXTERNAL_INITIALIZATION;
+      throw new ReplicationCliException(getThrowableMsg(msg, ne), code, ne);
     }
-    // Wait until it is over
-    SearchControls searchControls = new SearchControls();
-    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
-    searchControls.setReturningAttributes(
-        new String[] {
-            "ds-task-log-message",
-            "ds-task-state"
-        });
-    String filter = "objectclass=*";
+
     String lastLogMsg = null;
     while (!isOver)
     {
       sleepCatchInterrupt(500);
       try
       {
-        NamingEnumeration<SearchResult> res =
-          ctx.search(dn, filter, searchControls);
-        SearchResult sr = null;
-        try
-        {
-          while (res.hasMore())
-          {
-            sr = res.next();
-          }
-        }
-        finally
-        {
-          res.close();
-        }
+        SearchResult sr = getLastSearchResult(ctx, dn, "ds-task-log-message", "ds-task-state");
         String logMsg = getFirstValue(sr, "ds-task-log-message");
         if (logMsg != null && !logMsg.equals(lastLogMsg))
         {
@@ -7106,7 +7317,7 @@
         if (helper.isDone(state) || helper.isStoppedByError(state))
         {
           isOver = true;
-          LocalizableMessage errorMsg = getPrePostErrorMsg(isPre, lastLogMsg, state, ctx);
+          LocalizableMessage errorMsg = getPrePostErrorMsg(lastLogMsg, state, ctx);
 
           if (helper.isCompletedWithErrors(state))
           {
@@ -7130,27 +7341,20 @@
       }
       catch (NamingException ne)
       {
-        LocalizableMessage msg = isPre ?
-            ERR_POOLING_PRE_EXTERNAL_INITIALIZATION.get():
-              ERR_POOLING_POST_EXTERNAL_INITIALIZATION.get();
-            throw new ReplicationCliException(
-                getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
+        throw new ReplicationCliException(getThrowableMsg(ERR_READING_SERVER_TASK_PROGRESS.get(), ne),
+            ERROR_CONNECTING, ne);
       }
     }
   }
 
-  private LocalizableMessage getPrePostErrorMsg(boolean isPre, String lastLogMsg, String state, InitialLdapContext ctx)
+  private LocalizableMessage getPrePostErrorMsg(String lastLogMsg, String state, InitialLdapContext ctx)
   {
     String server = getHostPort(ctx);
     if (lastLogMsg != null)
     {
-      return isPre
-          ? INFO_ERROR_DURING_PRE_EXTERNAL_INITIALIZATION_LOG.get(lastLogMsg, state, server)
-          : INFO_ERROR_DURING_POST_EXTERNAL_INITIALIZATION_LOG.get(lastLogMsg, state, server);
+      return ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
     }
-    return isPre
-        ? INFO_ERROR_DURING_PRE_EXTERNAL_INITIALIZATION_NO_LOG.get(state, server)
-        : INFO_ERROR_DURING_POST_EXTERNAL_INITIALIZATION_NO_LOG.get(state, server);
+    return ERR_UNEXPECTED_DURING_TASK_NO_LOG.get(state, server);
   }
 
   private void sleepCatchInterrupt(long millis)
@@ -7179,58 +7383,23 @@
       boolean displayProgress)
   throws ClientException, PeerNotFoundException
   {
-    boolean taskCreated = false;
-    int i = 1;
     boolean isOver = false;
     String dn = null;
     String serverDisplay = getHostPort(ctx);
-    BasicAttributes attrs = new BasicAttributes();
-    Attribute oc = new BasicAttribute("objectclass");
-    oc.add("top");
-    oc.add("ds-task");
-    oc.add("ds-task-initialize-remote-replica");
-    attrs.put(oc);
-    attrs.put("ds-task-class-name",
-        "org.opends.server.tasks.InitializeTargetTask");
-    attrs.put("ds-task-initialize-domain-dn", baseDN);
-    attrs.put("ds-task-initialize-replica-server-id", "all");
-    while (!taskCreated)
+    Map<String, String> attrsMap = new TreeMap<>();
+    attrsMap.put("ds-task-initialize-domain-dn", baseDN);
+    attrsMap.put("ds-task-initialize-replica-server-id", "all");
+    try
     {
-      String id = "dsreplication-initialize"+i;
-      dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
-      attrs.put("ds-task-id", id);
-      try
-      {
-        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
-        taskCreated = true;
-        logger.info(LocalizableMessage.raw("created task entry: "+attrs));
-        dirCtx.close();
-      }
-      catch (NameAlreadyBoundException x)
-      {
-        logger.warn(LocalizableMessage.raw("A task with dn: "+dn+" already existed."));
-      }
-      catch (NamingException ne)
-      {
-        logger.error(LocalizableMessage.raw("Error creating task "+attrs, ne));
-        throw new ClientException(
-            ReturnCode.APPLICATION_ERROR,
-                getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(
-                        serverDisplay), ne), ne);
-      }
-      i++;
+      dn = createServerTask(ctx, "ds-task-initialize-remote-replica", "org.opends.server.tasks.InitializeTargetTask",
+          "dsreplication-initialize", attrsMap);
     }
-    // Wait until it is over
-    SearchControls searchControls = new SearchControls();
-    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
-    searchControls.setReturningAttributes(
-        new String[] {
-            "ds-task-unprocessed-entry-count",
-            "ds-task-processed-entry-count",
-            "ds-task-log-message",
-            "ds-task-state"
-        });
-    String filter = "objectclass=*";
+    catch (NamingException ne)
+    {
+      throw new ClientException(ReturnCode.APPLICATION_ERROR,
+              getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(serverDisplay), ne), ne);
+    }
+
     LocalizableMessage lastDisplayedMsg = null;
     String lastLogMsg = null;
     long lastTimeMsgDisplayed = -1;
@@ -7241,23 +7410,10 @@
       sleepCatchInterrupt(500);
       try
       {
-        NamingEnumeration<SearchResult> res =
-          ctx.search(dn, filter, searchControls);
-        SearchResult sr = null;
-        try
-        {
-          while (res.hasMore())
-          {
-            sr = res.next();
-          }
-        }
-        finally
-        {
-          res.close();
-        }
+        SearchResult sr = getLastSearchResult(ctx, dn, "ds-task-unprocessed-entry-count",
+            "ds-task-processed-entry-count", "ds-task-log-message", "ds-task-state" );
 
-        // Get the number of entries that have been handled and
-        // a percentage...
+        // Get the number of entries that have been handled and a percentage...
         String sProcessed = getFirstValue(sr, "ds-task-processed-entry-count");
         String sUnprocessed = getFirstValue(sr, "ds-task-unprocessed-entry-count");
         long processed = -1;
@@ -7376,6 +7532,65 @@
     }
   }
 
+  private SearchResult getLastSearchResult(InitialLdapContext ctx, String dn, String... returnedAttributes)
+      throws NamingException
+  {
+    SearchControls searchControls = new SearchControls();
+    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
+    searchControls.setReturningAttributes(returnedAttributes);
+    NamingEnumeration<SearchResult> res = ctx.search(dn, "objectclass=*", searchControls);
+    try
+    {
+      SearchResult sr = null;
+      while (res.hasMore())
+      {
+        sr = res.next();
+      }
+      return sr;
+    }
+    finally
+    {
+      res.close();
+    }
+  }
+
+  private String createServerTask(InitialLdapContext ctx, String taskObjectclass,
+      String taskJavaClass, String taskID, Map<String, String> taskAttrs) throws NamingException
+  {
+    int i = 1;
+    String dn = "";
+    BasicAttributes attrs = new BasicAttributes();
+    attrs.put("objectclass", taskObjectclass);
+    attrs.put("ds-task-class-name", taskJavaClass);
+    for (Map.Entry<String, String> attr : taskAttrs.entrySet())
+    {
+      attrs.put(attr.getKey(), attr.getValue());
+    }
+
+    while (true)
+    {
+      String id = taskID + "-" + i;
+      dn = "ds-task-id=" + id + ",cn=Scheduled Tasks,cn=Tasks";
+      try
+      {
+        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
+        logger.info(LocalizableMessage.raw("created task entry: " + attrs));
+        dirCtx.close();
+        return dn;
+      }
+      catch (NameAlreadyBoundException x)
+      {
+        logger.warn(LocalizableMessage.raw("A task with dn: " + dn + " already existed."));
+      }
+      catch (NamingException ne)
+      {
+        logger.error(LocalizableMessage.raw("Error creating task " + attrs, ne));
+        throw ne;
+      }
+      i++;
+    }
+  }
+
   private LocalizableMessage getInitializeAllErrorMsg(String serverDisplay, String lastLogMsg, String state)
   {
     if (lastLogMsg != null)
@@ -7497,8 +7712,7 @@
               if (areDnsEqual(domain.getBaseDN().toString(), baseDN))
               {
                 print(formatter.getFormattedWithPoints(
-                    INFO_REPLICATION_REMOVING_REFERENCES_ON_REMOTE.get(baseDN,
-                        hostPort)));
+                    INFO_REPLICATION_REMOVING_REFERENCES_ON_REMOTE.get(baseDN, hostPort)));
                 Set<String> replServers = domain.getReplicationServer();
                 if (replServers != null)
                 {
@@ -8199,19 +8413,18 @@
   {
     String commandName = getCommandName();
 
-    CommandBuilder commandBuilder =
-      new CommandBuilder(commandName, subcommandName);
+    CommandBuilder commandBuilder = new CommandBuilder(commandName, subcommandName);
 
     if (ENABLE_REPLICATION_SUBCMD_NAME.equals(subcommandName))
     {
       // All the arguments for enable replication are update here.
       updateCommandBuilder(commandBuilder, (EnableReplicationUserData)uData);
     }
-    else if (INITIALIZE_REPLICATION_SUBCMD_NAME.equals(subcommandName))
+    else if (INITIALIZE_REPLICATION_SUBCMD_NAME.equals(subcommandName) ||
+        RESET_CHANGE_NUMBER_SUBCMD_NAME.equals(subcommandName))
     {
       // All the arguments for initialize replication are update here.
-      updateCommandBuilder(commandBuilder,
-          (InitializeReplicationUserData)uData);
+      updateCommandBuilder(commandBuilder, (SourceDestinationServerUserData)uData);
     }
     else if (PURGE_HISTORICAL_SUBCMD_NAME.equals(subcommandName))
     {
@@ -8315,8 +8528,7 @@
         commandBuilder, taskSchedule);
   }
 
-  private void addGlobalArguments(CommandBuilder commandBuilder,
-      ReplicationUserData uData)
+  private void addGlobalArguments(CommandBuilder commandBuilder, ReplicationUserData uData)
   throws ArgumentException
   {
     List<String> baseDNs = uData.getBaseDNs();
@@ -8331,6 +8543,11 @@
     }
     commandBuilder.addArgument(baseDNsArg);
 
+    if (argParser.resetChangeNumber.isPresent())
+    {
+      commandBuilder.addArgument(argParser.resetChangeNumber);
+    }
+
     // Try to find some arguments and put them at the end.
     String[] identifiersToMove ={
         OPTION_LONG_ADMIN_UID,
@@ -8548,8 +8765,8 @@
       {
         if (OPTION_LONG_HOST.equals(arg.getLongIdentifier()))
         {
-          commandBuilder.addArgument(getHostArg("host2", 'O', server2.getHostName(),
-              INFO_DESCRIPTION_ENABLE_REPLICATION_HOST2));
+          commandBuilder.addArgument(
+              getHostArg("host2", 'O', server2.getHostName(), INFO_DESCRIPTION_ENABLE_REPLICATION_HOST2));
         }
         else if (OPTION_LONG_PORT.equals(arg.getLongIdentifier()))
         {
@@ -8714,7 +8931,7 @@
           "usesecondserverasschemasource", null,
           "useSecondServerAsSchemaSource",
           INFO_DESCRIPTION_ENABLE_REPLICATION_USE_SECOND_AS_SCHEMA_SOURCE.get(
-              "--"+argParser.noSchemaReplicationArg.getLongIdentifier())));
+              "--" + argParser.noSchemaReplicationArg.getLongIdentifier())));
     }
   }
 
@@ -8794,7 +9011,7 @@
   }
 
   private void updateCommandBuilder(CommandBuilder commandBuilder,
-      InitializeReplicationUserData uData)
+      SourceDestinationServerUserData uData)
   throws ArgumentException
   {
     // Update the arguments used in the console interaction with the
@@ -9182,8 +9399,8 @@
       String hostPortDestination = getHostPort(ctxDestination);
       if (isInteractive())
       {
-        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_CONFIRMATION.get(
-            hostPortSource, hostPortDestination, hostPortSource, hostPortDestination);
+        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_CONFIRMATION.get(hostPortSource,
+            hostPortDestination, hostPortSource, hostPortDestination);
         if (!askConfirmation(msg, true))
         {
           throw new ReplicationCliException(ERR_REPLICATION_USER_CANCELLED.get(), USER_CANCELLED, null);
@@ -9191,8 +9408,8 @@
       }
       else
       {
-        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_DESCRIPTION.get(
-            hostPortSource, hostPortDestination, hostPortSource, hostPortDestination);
+        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_DESCRIPTION.get(hostPortSource,
+            hostPortDestination, hostPortSource, hostPortDestination);
         println(msg);
         println();
       }
@@ -9234,8 +9451,7 @@
         if (!commonRepServerIDErrors.isEmpty())
         {
           mb.append(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID.get(
-            getMessageFromCollection(commonRepServerIDErrors,
-                Constants.LINE_SEPARATOR)));
+              getMessageFromCollection(commonRepServerIDErrors, Constants.LINE_SEPARATOR)));
         }
         if (!commonDomainIDErrors.isEmpty())
         {
@@ -9244,8 +9460,7 @@
             mb.append(Constants.LINE_SEPARATOR);
           }
           mb.append(ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID.get(
-            getMessageFromCollection(commonDomainIDErrors,
-                Constants.LINE_SEPARATOR)));
+              getMessageFromCollection(commonDomainIDErrors, Constants.LINE_SEPARATOR)));
         }
         throw new ReplicationCliException(mb.toMessage(),
             REPLICATION_ADS_MERGE_NOT_SUPPORTED, null);
@@ -9317,8 +9532,8 @@
       catch (Throwable t)
       {
         logger.error(LocalizableMessage.raw("Error seeding truststore: "+t, t));
-        LocalizableMessage msg = ERR_REPLICATION_ENABLE_SEEDING_TRUSTSTORE.get(
-            getHostPort(adsCtx2.getDirContext()), getHostPort(adsCtx1.getDirContext()), toString(t));
+        LocalizableMessage msg = ERR_REPLICATION_ENABLE_SEEDING_TRUSTSTORE.get(getHostPort(adsCtx2.getDirContext()),
+            getHostPort(adsCtx1.getDirContext()), toString(t));
         throw new ReplicationCliException(msg, ERROR_SEEDING_TRUSTORE, t);
       }
       pointAdder.stop();
@@ -9375,8 +9590,8 @@
       if (server2.isReplicationServer() && server2.getReplicationServerId() == replicationID1
           && !server2.getReplicationServerHostPort().equalsIgnoreCase(replServerHostPort1))
       {
-        commonRepServerIDErrors.add(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID_ARG.get(serverToFind
-            .getHostPort(true), server2.getHostPort(true), replicationID1));
+        commonRepServerIDErrors.add(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID_ARG.get(
+            serverToFind.getHostPort(true), server2.getHostPort(true), replicationID1));
         return true;
       }
     }
@@ -9399,12 +9614,8 @@
           && domain1Id == replica2.getReplicationId())
       {
         commonDomainIDErrors.add(
-            ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID_ARG.get(
-                replica1.getServer().getHostPort(true),
-                suffix1DN,
-                replica2.getServer().getHostPort(true),
-                suffix2.getDN(),
-                domain1Id));
+            ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID_ARG.get(replica1.getServer().getHostPort(true), suffix1DN,
+                replica2.getServer().getHostPort(true), suffix2.getDN(), domain1Id));
         return true;
       }
     }
@@ -9507,11 +9718,8 @@
 
   private boolean displayLogFileAtEnd(String subCommand)
   {
-    final List<String> subCommands = Arrays.asList(
-        ENABLE_REPLICATION_SUBCMD_NAME,
-        DISABLE_REPLICATION_SUBCMD_NAME,
-        INITIALIZE_ALL_REPLICATION_SUBCMD_NAME,
-        INITIALIZE_REPLICATION_SUBCMD_NAME);
+    final List<String> subCommands = Arrays.asList(ENABLE_REPLICATION_SUBCMD_NAME, DISABLE_REPLICATION_SUBCMD_NAME,
+        INITIALIZE_ALL_REPLICATION_SUBCMD_NAME, INITIALIZE_REPLICATION_SUBCMD_NAME, RESET_CHANGE_NUMBER_SUBCMD_NAME);
     return subCommands.contains(subCommand);
   }
 

--
Gitblit v1.10.0