mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Fabio Pistolesi
05.13.2015 d90a03781f4833f726597d735b79b49073940d75
opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
@@ -36,7 +36,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@@ -49,6 +48,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,13 +57,12 @@
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
import javax.naming.NoPermissionException;
import javax.naming.directory.Attribute;
import javax.naming.directory.BasicAttribute;
import javax.naming.directory.BasicAttributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.SearchControls;
import javax.naming.directory.SearchResult;
import javax.naming.ldap.InitialLdapContext;
import javax.naming.ldap.LdapName;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
@@ -112,6 +111,8 @@
import org.opends.server.tools.tasks.TaskScheduleInteraction;
import org.opends.server.tools.tasks.TaskScheduleUserData;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.HostPort;
import org.opends.server.types.InitializationException;
import org.opends.server.types.NullOutputStream;
import org.opends.server.types.OpenDsException;
@@ -147,6 +148,7 @@
import static com.forgerock.opendj.cli.Utils.*;
import static com.forgerock.opendj.util.OperatingSystem.*;
import static java.util.Collections.*;
import static org.forgerock.util.Utils.*;
import static org.opends.admin.ads.util.ConnectionUtils.*;
import static org.opends.admin.ads.util.PreferredConnection.*;
@@ -220,6 +222,9 @@
    STATUS(STATUS_REPLICATION_SUBCMD_NAME, INFO_REPLICATION_STATUS_MENU_PROMPT.get()),
    /** Replication purge historical. */
    PURGE_HISTORICAL(PURGE_HISTORICAL_SUBCMD_NAME, INFO_REPLICATION_PURGE_HISTORICAL_MENU_PROMPT.get()),
    /** Set changelog change number from another server. */
    RESET_CHANGE_NUMBER(ReplicationCliArgumentParser.RESET_CHANGE_NUMBER_SUBCMD_NAME,
        INFO_DESCRIPTION_RESET_CHANGE_NUMBER.get()),
    /** Cancel operation. */
    CANCEL(null, null);
@@ -256,6 +261,34 @@
    }
  }
  /** Abstract some of the operations when two servers must be queried for information. */
  private interface OperationBetweenSourceAndDestinationServers
  {
    /**
     * Returns whether we should stop processing after asking the user for additional information.
     * Might connect to servers to run configuration checks.
     * @param baseDNs user specified baseDNs
     * @param source the source server
     * @param dest the destination server
     * @param interactive if user has to input information
     * @return whether we should stop
     */
    boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source, InitialLdapContext dest,
        boolean interactive);
    /**
     * Confirm with the user whether the current task should continue.
     *
     * @param uData servers address and authentication parameters
     * @param ctxSource LDAP Context for the destination server
     * @param ctxDestination LDAP Context for the source server
     * @param defaultValue default yes or no
     * @return whether the current task should be interrupted
     */
    boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
        InitialLdapContext ctxDestination, final boolean defaultValue);
  }
  /** The argument parser to be used. */
  private ReplicationCliArgumentParser argParser;
  private FileBasedArgument userProvidedAdminPwdFile;
@@ -527,6 +560,8 @@
      return statusReplication();
    case PURGE_HISTORICAL:
      return purgeHistorical();
    case RESET_CHANGE_NUMBER:
      return resetChangeNumber();
    default:
      return SUCCESSFUL_NOP;
    }
@@ -543,15 +578,7 @@
   */
  private String askForAdministratorUID(String defaultValue, LocalizedLogger logger)
  {
    try
    {
      return readInput(INFO_ADMINISTRATOR_UID_PROMPT.get(), defaultValue);
    }
    catch (ClientException ce)
    {
      logger.warn(LocalizableMessage.raw("Error reading input: " + ce, ce));
      return defaultValue;
    }
    return ask(logger, INFO_ADMINISTRATOR_UID_PROMPT.get(), defaultValue);
  }
  /**
@@ -574,6 +601,19 @@
    }
  }
  private String ask(LocalizedLogger logger, LocalizableMessage msgPrompt, String defaultValue)
  {
    try
    {
      return readInput(msgPrompt, defaultValue);
    }
    catch (ClientException ce)
    {
      logger.warn(LocalizableMessage.raw("Error reading input: " + ce, ce));
      return defaultValue;
    }
  }
  /**
   * Commodity method used to repeatidly ask the user to provide an integer
   * value.
@@ -1316,6 +1356,219 @@
    }
  }
  private ReplicationCliReturnCode resetChangeNumber()
  {
    final String changeNumber;
    final SourceDestinationServerUserData uData = new SourceDestinationServerUserData();
    if (!argParser.isInteractive())
    {
      initializeWithArgParser(uData);
      return resetChangeNumber(uData);
    }
    OperationBetweenSourceAndDestinationServers
        resetChangeNumberOperations = new OperationBetweenSourceAndDestinationServers()
    {
      @Override
      public boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source,
          InitialLdapContext dest, boolean interactive)
      {
        TopologyCacheFilter filter = new TopologyCacheFilter();
        filter.setSearchMonitoringInformation(false);
        if (!argParser.resetChangeNumber.isPresent())
        {
          String cn = getNewestChangeNumber(source);
          if (cn.isEmpty())
          {
            return true;
          }
          argParser.setResetChangeNumber(
              ask(logger, INFO_RESET_CHANGE_NUMBER_TO.get(uData.getSource(), uData.getDestination()), cn));
        }
        return false;
      }
      @Override
      public boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
          InitialLdapContext ctxDestination, boolean defaultValue)
      {
        return !askConfirmation(INFO_RESET_CHANGE_NUMBER_CONFIRM_RESET.get(uData.getDestinationHostPort()),
            defaultValue);
      }
    };
    return promptIfRequired(uData, resetChangeNumberOperations) ? resetChangeNumber(uData) : USER_CANCELLED;
  }
  private ReplicationCliReturnCode resetChangeNumber(SourceDestinationServerUserData uData)
  {
    InitialLdapContext ctxSource = createAdministrativeContext(uData, uData.getSource());
    InitialLdapContext ctxDest = createAdministrativeContext(uData, uData.getDestination());
    if (!getCommonSuffixes(ctxSource, ctxDest, SuffixRelationType.NOT_FULLY_REPLICATED).isEmpty())
    {
      errPrintln(ERROR_RESET_CHANGE_NUMBER_SERVERS_BASEDNS_DIFFER.get(uData.getSourceHostPort(),
          uData.getDestinationHostPort()));
      return ERROR_RESET_CHANGE_NUMBER_BASEDNS_SHOULD_EQUAL;
    }
    if (mustPrintCommandBuilder())
    {
      printNewCommandBuilder(RESET_CHANGE_NUMBER_SUBCMD_NAME, uData);
    }
    try
    {
      String newStartCN;
      if (argParser.resetChangeNumber.isPresent())
      {
        newStartCN = String.valueOf(argParser.getResetChangeNumber());
      }
      else
      {
        newStartCN = getNewestChangeNumber(ctxSource);
        if (newStartCN.isEmpty())
        {
          return ERROR_UNKNOWN_CHANGE_NUMBER;
        }
        argParser.setResetChangeNumber(newStartCN);
      }
      SearchControls ctls = new SearchControls();
      ctls.setSearchScope(SearchControls.SUBTREE_SCOPE);
      ctls.setReturningAttributes(
          new String[] {
              "changeNumber",
              "replicationCSN",
              "targetDN"
          });
      NamingEnumeration<SearchResult> listeners = ctxSource.search(new LdapName("cn=changelog"),
          "(changeNumber=" + newStartCN + ")", ctls);
      if (!listeners.hasMore())
      {
        errPrintln(ERROR_RESET_CHANGE_NUMBER_UNKNOWN_NUMBER.get(newStartCN, uData.getSourceHostPort()));
        return ERROR_UNKNOWN_CHANGE_NUMBER;
      }
      SearchResult sr = listeners.next();
      String newStartCSN = getFirstValue(sr, "replicationCSN");
      if (newStartCSN == null)
      {
        errPrintln(ERROR_RESET_CHANGE_NUMBER_NO_CSN_FOUND.get(newStartCN, uData.getSourceHostPort()));
        return ERROR_RESET_CHANGE_NUMBER_NO_CSN;
      }
      String targetDN = getFirstValue(sr, "targetDN");
      DN targetBaseDN = DN.rootDN();
      try
      {
        for (String adn : getCommonSuffixes(ctxSource, ctxDest, SuffixRelationType.REPLICATED))
        {
          DN dn = DN.valueOf(adn);
          if (DN.valueOf(targetDN).isDescendantOf(dn) && dn.isDescendantOf(targetBaseDN))
          {
            targetBaseDN = dn;
          }
        }
      }
      catch (DirectoryException de)
      {
        errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(de.getLocalizedMessage()));
        return ERROR_RESET_CHANGE_NUMBER_PROBLEM;
      }
      if (targetBaseDN.isRootDN())
      {
        errPrintln(ERROR_RESET_CHANGE_NUMBER_NO_BASEDN.get(newStartCN, targetDN, newStartCSN));
        return ERROR_RESET_CHANGE_NUMBER_UNKNOWN_BASEDN;
      }
      logger.info(INFO_RESET_CHANGE_NUMBER_INFO.get(uData.getDestinationHostPort(),
          newStartCN, newStartCSN, targetBaseDN.toString()));
      Map<String, String> taskAttrs = new TreeMap<>();
      taskAttrs.put("ds-task-reset-change-number-to", newStartCN);
      taskAttrs.put("ds-task-reset-change-number-csn", newStartCSN);
      taskAttrs.put("ds-task-reset-change-number-base-dn", targetBaseDN.toString());
      String taskDN = createServerTask(ctxDest,
          "ds-task-reset-change-number", "org.opends.server.tasks.ResetChangeNumberTask", "dsreplication-reset-cn",
          taskAttrs);
      waitUntilResetChangeNumberTaskEnds(ctxDest, taskDN);
      return SUCCESSFUL;
    }
    catch (ReplicationCliException | NamingException | NullPointerException e)
    {
      errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(e.getLocalizedMessage()));
      return ERROR_RESET_CHANGE_NUMBER_PROBLEM;
    }
  }
  private String getNewestChangeNumber(InitialLdapContext source)
  {
    try
    {
      SearchControls ctls = new SearchControls();
      ctls.setSearchScope(SearchControls.OBJECT_SCOPE);
      ctls.setReturningAttributes(new String[] {"lastChangeNumber"});
      NamingEnumeration<SearchResult> results = source.search(new LdapName(""), "objectclass=*", ctls);
      if (results.hasMore()) {
        return getFirstValue(results.next(), "lastChangeNumber");
      }
    }
    catch (NamingException e)
    {
      errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(e.getLocalizedMessage()));
    }
    return "";
  }
  private void waitUntilResetChangeNumberTaskEnds(InitialLdapContext server, String taskDN)
      throws ReplicationCliException
  {
    String lastLogMsg = null;
    while (true)
    {
      sleepCatchInterrupt(500);
      try
      {
        SearchResult sr = getLastSearchResult(server, taskDN, "ds-task-log-message", "ds-task-state" );
        String logMsg = getFirstValue(sr, "ds-task-log-message");
        if (logMsg != null && !logMsg.equals(lastLogMsg))
        {
          logger.info(LocalizableMessage.raw(logMsg));
          lastLogMsg = logMsg;
        }
        InstallerHelper helper = new InstallerHelper();
        String state = getFirstValue(sr, "ds-task-state");
        if (helper.isDone(state) || helper.isStoppedByError(state))
        {
          LocalizableMessage errorMsg = ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
          if (helper.isCompletedWithErrors(state))
          {
            logger.warn(LocalizableMessage.raw("Completed with error: " + errorMsg));
            errPrintln(errorMsg);
          }
          else if (!helper.isSuccessful(state) || helper.isStoppedByError(state))
          {
            logger.warn(LocalizableMessage.raw("Error: " + errorMsg));
            throw new ReplicationCliException(errorMsg, ERROR_LAUNCHING_RESET_CHANGE_NUMBER, null);
          }
          else
          {
            print(INFO_RESET_CHANGE_NUMBER_TASK_FINISHED.get());
            println();
          }
          return;
        }
      }
      catch (NameNotFoundException x)
      {
        return;
      }
      catch (NamingException ne)
      {
        throw new ReplicationCliException(getThrowableMsg(ERR_READING_SERVER_TASK_PROGRESS.get(), ne),
            ERROR_CONNECTING, ne);
      }
    }
  }
  private InitialLdapContext createAdministrativeContext(MonoServerReplicationUserData uData)
  {
    final String bindDn = getAdministratorDN(uData.getAdminUid());
@@ -1411,40 +1664,22 @@
            getThrowableMsg(msg, ne), code, ne);
      }
    }
    // Wait until it is over
    SearchControls searchControls = new SearchControls();
    searchControls.setCountLimit(1);
    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
    searchControls.setReturningAttributes(
        new String[] {
            "ds-task-log-message",
            "ds-task-state",
            "ds-task-purge-conflicts-historical-purged-values-count",
            "ds-task-purge-conflicts-historical-purge-completed-in-time",
            "ds-task-purge-conflicts-historical-purge-completed-in-time",
            "ds-task-purge-conflicts-historical-last-purged-changenumber"
        });
    String filter = "objectclass=*";
    String lastLogMsg = null;
    // Polling only makes sense when we are recurrently scheduling a task
    // or the task is being executed now.
    String lastLogMsg = null;
    while (!isOver && uData.getTaskSchedule().getStartDate() == null)
    {
      sleepCatchInterrupt(500);
      try
      {
        NamingEnumeration<SearchResult> res =
          ctx.search(dn, filter, searchControls);
        SearchResult sr = null;
        try
        {
          sr = res.next();
        }
        finally
        {
          res.close();
        }
        SearchResult sr = getFirstSearchResult(ctx, dn,
            "ds-task-log-message",
            "ds-task-state",
            "ds-task-purge-conflicts-historical-purged-values-count",
            "ds-task-purge-conflicts-historical-purge-completed-in-time",
            "ds-task-purge-conflicts-historical-purge-completed-in-time",
            "ds-task-purge-conflicts-historical-last-purged-changenumber");
        String logMsg = getFirstValue(sr, "ds-task-log-message");
        if (logMsg != null && !logMsg.equals(lastLogMsg))
        {
@@ -1479,7 +1714,7 @@
      }
      catch (NamingException ne)
      {
        LocalizableMessage msg = ERR_POOLING_PURGE_HISTORICAL.get();
        LocalizableMessage msg = ERR_READING_SERVER_TASK_PROGRESS.get();
        throw new ReplicationCliException(
          getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
      }
@@ -1492,14 +1727,34 @@
    return returnCode;
  }
  private SearchResult getFirstSearchResult(InitialLdapContext ctx, String dn, String... returnedAttributes)
      throws NamingException
  {
    SearchControls searchControls = new SearchControls();
    searchControls.setCountLimit(1);
    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
    searchControls.setReturningAttributes(returnedAttributes);
    NamingEnumeration<SearchResult> res = ctx.search(dn, "objectclass=*", searchControls);
    try
    {
      SearchResult sr = null;
      sr = res.next();
      return sr;
    }
    finally
    {
      res.close();
    }
  }
  private LocalizableMessage getPurgeErrorMsg(String lastLogMsg, String state, InitialLdapContext ctx)
  {
    String server = getHostPort(ctx);
    if (lastLogMsg != null)
    {
      return INFO_ERROR_DURING_PURGE_HISTORICAL_LOG.get(lastLogMsg, state, server);
      return ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
    }
    return INFO_ERROR_DURING_PURGE_HISTORICAL_NO_LOG.get(state, server);
    return ERR_UNEXPECTED_DURING_TASK_NO_LOG.get(state, server);
  }
  /**
@@ -1556,7 +1811,7 @@
        }
        replica.setBackendName(backend.getBackendID());
        replica.setSuffix(suffix);
        suffix.setReplicas(Collections.singleton(replica));
        suffix.setReplicas(singleton(replica));
        replicas.add(replica);
      }
@@ -1685,23 +1940,32 @@
   */
  private ReplicationCliReturnCode initializeReplication()
  {
    InitializeReplicationUserData uData = new InitializeReplicationUserData();
    if (argParser.isInteractive())
    {
      if (promptIfRequired(uData))
      {
        return initializeReplication(uData);
      }
      else
      {
        return USER_CANCELLED;
      }
    }
    else
    SourceDestinationServerUserData uData = new SourceDestinationServerUserData();
    if (!argParser.isInteractive())
    {
      initializeWithArgParser(uData);
      return initializeReplication(uData);
    }
    OperationBetweenSourceAndDestinationServers
        initializeReplicationOperations = new OperationBetweenSourceAndDestinationServers()
    {
      @Override
      public boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source,
          InitialLdapContext dest, boolean interactive)
      {
        checkSuffixesForInitializeReplication(baseDNs, source, dest, interactive);
        return baseDNs.isEmpty();
      }
      @Override
      public boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
          InitialLdapContext ctxDestination, boolean defaultValue)
      {
        return !askConfirmation(getInitializeReplicationPrompt(uData, ctxSource, ctxDestination), defaultValue);
      }
    };
    return promptIfRequired(uData, initializeReplicationOperations) ? initializeReplication(uData) : USER_CANCELLED;
  }
  /**
@@ -2782,7 +3046,7 @@
      {
        if (uData instanceof InitializeAllReplicationUserData)
        {
          sourceServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
          sourceServerCI.setHeadingMessage(INFO_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
        }
        sourceServerCI.run();
@@ -2871,10 +3135,12 @@
   * is missing, ask the user to provide valid data.
   * We assume that if this method is called we are in interactive mode.
   * @param uData the object to be updated.
   * @param serversOperations Additional processing for the command
   * @return <CODE>true</CODE> if the object was successfully updated and
   * <CODE>false</CODE> if the user cancelled the operation.
   */
  private boolean promptIfRequired(InitializeReplicationUserData uData)
  private boolean promptIfRequired(SourceDestinationServerUserData uData,
      OperationBetweenSourceAndDestinationServers serversOperations)
  {
    boolean cancelled = false;
@@ -2903,7 +3169,7 @@
    {
      try
      {
        sourceServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
        sourceServerCI.setHeadingMessage(INFO_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
        sourceServerCI.run();
        hostSource = sourceServerCI.getHostName();
        portSource = sourceServerCI.getPortNumber();
@@ -2966,7 +3232,7 @@
    {
      try
      {
        destinationServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_DESTINATION_CONNECTION_PARAMETERS.get());
        destinationServerCI.setHeadingMessage(INFO_INITIALIZE_DESTINATION_CONNECTION_PARAMETERS.get());
        destinationServerCI.run();
        hostDestination = destinationServerCI.getHostName();
        portDestination = destinationServerCI.getPortNumber();
@@ -2977,7 +3243,7 @@
        {
          portDestination = -1;
          errPrintln();
          errPrintln(ERR_REPLICATION_INITIALIZE_SAME_SERVER_PORT.get(hostSource, portSource));
          errPrintln(ERR_SOURCE_DESTINATION_INITIALIZE_SAME_SERVER_PORT.get(hostSource, portSource));
          errPrintln();
          error = true;
        }
@@ -3016,17 +3282,14 @@
    if (!cancelled)
    {
      LinkedList<String> suffixes = argParser.getBaseDNs();
      checkSuffixesForInitializeReplication(suffixes, ctxSource, ctxDestination, true);
      cancelled = suffixes.isEmpty();
      cancelled = serversOperations.continueAfterUserInput(suffixes, ctxSource, ctxDestination, true);
      uData.setBaseDNs(suffixes);
    }
    if (!cancelled)
    {
      // Ask for confirmation to initialize.
      println();
      cancelled = !askConfirmation(getPrompt(uData, ctxSource, ctxDestination), true);
      cancelled = serversOperations.confirmOperation(uData, ctxSource, ctxDestination, true);
      println();
    }
@@ -3034,8 +3297,8 @@
    return !cancelled;
  }
  private LocalizableMessage getPrompt(InitializeReplicationUserData uData, InitialLdapContext ctxSource,
      InitialLdapContext ctxDestination)
  private LocalizableMessage getInitializeReplicationPrompt(SourceDestinationServerUserData uData,
      InitialLdapContext ctxSource, InitialLdapContext ctxDestination)
  {
    String hostPortSource = getHostPort(ctxSource);
    String hostPortDestination = getHostPort(ctxDestination);
@@ -3138,7 +3401,7 @@
   * user.
   * @param uData the initialize replication user data object to be initialized.
   */
  private void initializeWithArgParser(InitializeReplicationUserData uData)
  private void initializeWithArgParser(SourceDestinationServerUserData uData)
  {
    initialize(uData);
@@ -3250,8 +3513,7 @@
    }
    catch (Throwable t)
    {
      logger.warn(LocalizableMessage.raw(
          "Unexpected error retrieving the replication port: "+t, t));
      logger.warn(LocalizableMessage.raw("Unexpected error retrieving the replication port: " + t, t));
    }
    return replicationPort;
  }
@@ -3547,8 +3809,7 @@
   * @return a Collection containing a list of suffixes that are replicated
   * (or those that can be replicated) in two servers.
   */
  private Collection<String> getCommonSuffixes(
      InitialLdapContext ctx1, InitialLdapContext ctx2, SuffixRelationType type)
  private List<String> getCommonSuffixes(InitialLdapContext ctx1, InitialLdapContext ctx2, SuffixRelationType type)
  {
    LinkedList<String> suffixes = new LinkedList<>();
    try
@@ -4005,11 +4266,10 @@
   * @return ReplicationCliReturnCode.SUCCESSFUL if the operation was
   * successful and an error code otherwise.
   */
  private ReplicationCliReturnCode initializeReplication(
      InitializeReplicationUserData uData)
  private ReplicationCliReturnCode initializeReplication(SourceDestinationServerUserData uData)
  {
    InitialLdapContext ctxSource = createAdministrativeContext(uData, true);
    InitialLdapContext ctxDestination = createAdministrativeContext(uData, false);
    InitialLdapContext ctxSource = createAdministrativeContext(uData, uData.getSource());
    InitialLdapContext ctxDestination = createAdministrativeContext(uData, uData.getDestination());
    try
    {
      if (ctxSource == null || ctxDestination == null)
@@ -4056,22 +4316,19 @@
    }
  }
  private InitialLdapContext createAdministrativeContext(InitializeReplicationUserData uData, boolean isSource)
  private InitialLdapContext createAdministrativeContext(SourceDestinationServerUserData uData, HostPort server)
  {
    final String host = isSource ? uData.getHostNameSource() : uData.getHostNameDestination();
    final int port = isSource ? uData.getPortSource() : uData.getPortDestination();
    try
    {
      return createAdministrativeContext(
          host, port, useSSL, useStartTLS,
          server.getHost(), server.getPort(), useSSL, useStartTLS,
          getAdministratorDN(uData.getAdminUid()), uData.getAdminPwd(),
          getConnectTimeout(), getTrustManager(sourceServerCI));
    }
    catch (NamingException ne)
    {
      final String hostPort = getServerRepresentation(host, port);
      errPrintln();
      errPrintln(getMessageForException(ne, hostPort));
      errPrintln(getMessageForException(ne, server.toString()));
      logger.error(LocalizableMessage.raw("Complete error stack:"), ne);
      return null;
    }
@@ -6890,8 +7147,7 @@
      public void progressUpdate(ProgressUpdateEvent ev)
      {
        LocalizableMessage newLogDetails = ev.getNewLogs();
        if (newLogDetails != null
            && !"".equals(newLogDetails.toString().trim()))
        if (newLogDetails != null && !"".equals(newLogDetails.toString().trim()))
        {
          print(newLogDetails);
          println();
@@ -6904,8 +7160,7 @@
    {
      try
      {
        installer.initializeSuffix(ctxDestination, replicationId, baseDN,
            displayProgress, getHostPort(ctxSource));
        installer.initializeSuffix(ctxDestination, replicationId, baseDN, displayProgress, getHostPort(ctxSource));
        initDone = true;
      }
      catch (PeerNotFoundException pnfe)
@@ -7019,81 +7274,37 @@
  private void postPreExternalInitialization(String baseDN,
      InitialLdapContext ctx, boolean isPre) throws ReplicationCliException
  {
    boolean taskCreated = false;
    int i = 1;
    boolean isOver = false;
    String dn = null;
    BasicAttributes attrs = new BasicAttributes();
    Attribute oc = new BasicAttribute("objectclass");
    oc.add("top");
    oc.add("ds-task");
    oc.add("ds-task-reset-generation-id");
    attrs.put(oc);
    attrs.put("ds-task-class-name",
        "org.opends.server.tasks.SetGenerationIdTask");
    Map<String, String> attrMap = new TreeMap<>();
    if (isPre)
    {
      attrs.put("ds-task-reset-generation-id-new-value", "-1");
      attrMap.put("ds-task-reset-generation-id-new-value", "-1");
    }
    attrs.put("ds-task-reset-generation-id-domain-base-dn", baseDN);
    while (!taskCreated)
    attrMap.put("ds-task-reset-generation-id-domain-base-dn", baseDN);
    try {
      dn = createServerTask(ctx, "ds-task-reset-generation-id", "org.opends.server.tasks.SetGenerationIdTask",
          "dsreplication-reset-generation-id", attrMap);
    }
    catch (NamingException ne)
    {
      String id = "dsreplication-reset-generation-id-"+i;
      dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
      attrs.put("ds-task-id", id);
      try
      {
        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
        taskCreated = true;
        logger.info(LocalizableMessage.raw("created task entry: "+attrs));
        dirCtx.close();
      }
      catch (NameAlreadyBoundException x)
      {
      }
      catch (NamingException ne)
      {
        logger.error(LocalizableMessage.raw("Error creating task "+attrs, ne));
        LocalizableMessage msg = isPre ?
        ERR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION.get():
      LocalizableMessage msg = isPre ?
          ERR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION.get():
          ERR_LAUNCHING_POST_EXTERNAL_INITIALIZATION.get();
        ReplicationCliReturnCode code = isPre?
            ERROR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION:
              ERROR_LAUNCHING_POST_EXTERNAL_INITIALIZATION;
        throw new ReplicationCliException(
            getThrowableMsg(msg, ne), code, ne);
      }
      i++;
      ReplicationCliReturnCode code = isPre?
          ERROR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION:
          ERROR_LAUNCHING_POST_EXTERNAL_INITIALIZATION;
      throw new ReplicationCliException(getThrowableMsg(msg, ne), code, ne);
    }
    // Wait until it is over
    SearchControls searchControls = new SearchControls();
    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
    searchControls.setReturningAttributes(
        new String[] {
            "ds-task-log-message",
            "ds-task-state"
        });
    String filter = "objectclass=*";
    String lastLogMsg = null;
    while (!isOver)
    {
      sleepCatchInterrupt(500);
      try
      {
        NamingEnumeration<SearchResult> res =
          ctx.search(dn, filter, searchControls);
        SearchResult sr = null;
        try
        {
          while (res.hasMore())
          {
            sr = res.next();
          }
        }
        finally
        {
          res.close();
        }
        SearchResult sr = getLastSearchResult(ctx, dn, "ds-task-log-message", "ds-task-state");
        String logMsg = getFirstValue(sr, "ds-task-log-message");
        if (logMsg != null && !logMsg.equals(lastLogMsg))
        {
@@ -7106,7 +7317,7 @@
        if (helper.isDone(state) || helper.isStoppedByError(state))
        {
          isOver = true;
          LocalizableMessage errorMsg = getPrePostErrorMsg(isPre, lastLogMsg, state, ctx);
          LocalizableMessage errorMsg = getPrePostErrorMsg(lastLogMsg, state, ctx);
          if (helper.isCompletedWithErrors(state))
          {
@@ -7130,27 +7341,20 @@
      }
      catch (NamingException ne)
      {
        LocalizableMessage msg = isPre ?
            ERR_POOLING_PRE_EXTERNAL_INITIALIZATION.get():
              ERR_POOLING_POST_EXTERNAL_INITIALIZATION.get();
            throw new ReplicationCliException(
                getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
        throw new ReplicationCliException(getThrowableMsg(ERR_READING_SERVER_TASK_PROGRESS.get(), ne),
            ERROR_CONNECTING, ne);
      }
    }
  }
  private LocalizableMessage getPrePostErrorMsg(boolean isPre, String lastLogMsg, String state, InitialLdapContext ctx)
  private LocalizableMessage getPrePostErrorMsg(String lastLogMsg, String state, InitialLdapContext ctx)
  {
    String server = getHostPort(ctx);
    if (lastLogMsg != null)
    {
      return isPre
          ? INFO_ERROR_DURING_PRE_EXTERNAL_INITIALIZATION_LOG.get(lastLogMsg, state, server)
          : INFO_ERROR_DURING_POST_EXTERNAL_INITIALIZATION_LOG.get(lastLogMsg, state, server);
      return ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
    }
    return isPre
        ? INFO_ERROR_DURING_PRE_EXTERNAL_INITIALIZATION_NO_LOG.get(state, server)
        : INFO_ERROR_DURING_POST_EXTERNAL_INITIALIZATION_NO_LOG.get(state, server);
    return ERR_UNEXPECTED_DURING_TASK_NO_LOG.get(state, server);
  }
  private void sleepCatchInterrupt(long millis)
@@ -7179,58 +7383,23 @@
      boolean displayProgress)
  throws ClientException, PeerNotFoundException
  {
    boolean taskCreated = false;
    int i = 1;
    boolean isOver = false;
    String dn = null;
    String serverDisplay = getHostPort(ctx);
    BasicAttributes attrs = new BasicAttributes();
    Attribute oc = new BasicAttribute("objectclass");
    oc.add("top");
    oc.add("ds-task");
    oc.add("ds-task-initialize-remote-replica");
    attrs.put(oc);
    attrs.put("ds-task-class-name",
        "org.opends.server.tasks.InitializeTargetTask");
    attrs.put("ds-task-initialize-domain-dn", baseDN);
    attrs.put("ds-task-initialize-replica-server-id", "all");
    while (!taskCreated)
    Map<String, String> attrsMap = new TreeMap<>();
    attrsMap.put("ds-task-initialize-domain-dn", baseDN);
    attrsMap.put("ds-task-initialize-replica-server-id", "all");
    try
    {
      String id = "dsreplication-initialize"+i;
      dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
      attrs.put("ds-task-id", id);
      try
      {
        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
        taskCreated = true;
        logger.info(LocalizableMessage.raw("created task entry: "+attrs));
        dirCtx.close();
      }
      catch (NameAlreadyBoundException x)
      {
        logger.warn(LocalizableMessage.raw("A task with dn: "+dn+" already existed."));
      }
      catch (NamingException ne)
      {
        logger.error(LocalizableMessage.raw("Error creating task "+attrs, ne));
        throw new ClientException(
            ReturnCode.APPLICATION_ERROR,
                getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(
                        serverDisplay), ne), ne);
      }
      i++;
      dn = createServerTask(ctx, "ds-task-initialize-remote-replica", "org.opends.server.tasks.InitializeTargetTask",
          "dsreplication-initialize", attrsMap);
    }
    // Wait until it is over
    SearchControls searchControls = new SearchControls();
    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
    searchControls.setReturningAttributes(
        new String[] {
            "ds-task-unprocessed-entry-count",
            "ds-task-processed-entry-count",
            "ds-task-log-message",
            "ds-task-state"
        });
    String filter = "objectclass=*";
    catch (NamingException ne)
    {
      throw new ClientException(ReturnCode.APPLICATION_ERROR,
              getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(serverDisplay), ne), ne);
    }
    LocalizableMessage lastDisplayedMsg = null;
    String lastLogMsg = null;
    long lastTimeMsgDisplayed = -1;
@@ -7241,23 +7410,10 @@
      sleepCatchInterrupt(500);
      try
      {
        NamingEnumeration<SearchResult> res =
          ctx.search(dn, filter, searchControls);
        SearchResult sr = null;
        try
        {
          while (res.hasMore())
          {
            sr = res.next();
          }
        }
        finally
        {
          res.close();
        }
        SearchResult sr = getLastSearchResult(ctx, dn, "ds-task-unprocessed-entry-count",
            "ds-task-processed-entry-count", "ds-task-log-message", "ds-task-state" );
        // Get the number of entries that have been handled and
        // a percentage...
        // Get the number of entries that have been handled and a percentage...
        String sProcessed = getFirstValue(sr, "ds-task-processed-entry-count");
        String sUnprocessed = getFirstValue(sr, "ds-task-unprocessed-entry-count");
        long processed = -1;
@@ -7376,6 +7532,65 @@
    }
  }
  private SearchResult getLastSearchResult(InitialLdapContext ctx, String dn, String... returnedAttributes)
      throws NamingException
  {
    SearchControls searchControls = new SearchControls();
    searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
    searchControls.setReturningAttributes(returnedAttributes);
    NamingEnumeration<SearchResult> res = ctx.search(dn, "objectclass=*", searchControls);
    try
    {
      SearchResult sr = null;
      while (res.hasMore())
      {
        sr = res.next();
      }
      return sr;
    }
    finally
    {
      res.close();
    }
  }
  private String createServerTask(InitialLdapContext ctx, String taskObjectclass,
      String taskJavaClass, String taskID, Map<String, String> taskAttrs) throws NamingException
  {
    int i = 1;
    String dn = "";
    BasicAttributes attrs = new BasicAttributes();
    attrs.put("objectclass", taskObjectclass);
    attrs.put("ds-task-class-name", taskJavaClass);
    for (Map.Entry<String, String> attr : taskAttrs.entrySet())
    {
      attrs.put(attr.getKey(), attr.getValue());
    }
    while (true)
    {
      String id = taskID + "-" + i;
      dn = "ds-task-id=" + id + ",cn=Scheduled Tasks,cn=Tasks";
      try
      {
        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
        logger.info(LocalizableMessage.raw("created task entry: " + attrs));
        dirCtx.close();
        return dn;
      }
      catch (NameAlreadyBoundException x)
      {
        logger.warn(LocalizableMessage.raw("A task with dn: " + dn + " already existed."));
      }
      catch (NamingException ne)
      {
        logger.error(LocalizableMessage.raw("Error creating task " + attrs, ne));
        throw ne;
      }
      i++;
    }
  }
  private LocalizableMessage getInitializeAllErrorMsg(String serverDisplay, String lastLogMsg, String state)
  {
    if (lastLogMsg != null)
@@ -7497,8 +7712,7 @@
              if (areDnsEqual(domain.getBaseDN().toString(), baseDN))
              {
                print(formatter.getFormattedWithPoints(
                    INFO_REPLICATION_REMOVING_REFERENCES_ON_REMOTE.get(baseDN,
                        hostPort)));
                    INFO_REPLICATION_REMOVING_REFERENCES_ON_REMOTE.get(baseDN, hostPort)));
                Set<String> replServers = domain.getReplicationServer();
                if (replServers != null)
                {
@@ -8199,19 +8413,18 @@
  {
    String commandName = getCommandName();
    CommandBuilder commandBuilder =
      new CommandBuilder(commandName, subcommandName);
    CommandBuilder commandBuilder = new CommandBuilder(commandName, subcommandName);
    if (ENABLE_REPLICATION_SUBCMD_NAME.equals(subcommandName))
    {
      // All the arguments for enable replication are update here.
      updateCommandBuilder(commandBuilder, (EnableReplicationUserData)uData);
    }
    else if (INITIALIZE_REPLICATION_SUBCMD_NAME.equals(subcommandName))
    else if (INITIALIZE_REPLICATION_SUBCMD_NAME.equals(subcommandName) ||
        RESET_CHANGE_NUMBER_SUBCMD_NAME.equals(subcommandName))
    {
      // All the arguments for initialize replication are update here.
      updateCommandBuilder(commandBuilder,
          (InitializeReplicationUserData)uData);
      updateCommandBuilder(commandBuilder, (SourceDestinationServerUserData)uData);
    }
    else if (PURGE_HISTORICAL_SUBCMD_NAME.equals(subcommandName))
    {
@@ -8315,8 +8528,7 @@
        commandBuilder, taskSchedule);
  }
  private void addGlobalArguments(CommandBuilder commandBuilder,
      ReplicationUserData uData)
  private void addGlobalArguments(CommandBuilder commandBuilder, ReplicationUserData uData)
  throws ArgumentException
  {
    List<String> baseDNs = uData.getBaseDNs();
@@ -8331,6 +8543,11 @@
    }
    commandBuilder.addArgument(baseDNsArg);
    if (argParser.resetChangeNumber.isPresent())
    {
      commandBuilder.addArgument(argParser.resetChangeNumber);
    }
    // Try to find some arguments and put them at the end.
    String[] identifiersToMove ={
        OPTION_LONG_ADMIN_UID,
@@ -8548,8 +8765,8 @@
      {
        if (OPTION_LONG_HOST.equals(arg.getLongIdentifier()))
        {
          commandBuilder.addArgument(getHostArg("host2", 'O', server2.getHostName(),
              INFO_DESCRIPTION_ENABLE_REPLICATION_HOST2));
          commandBuilder.addArgument(
              getHostArg("host2", 'O', server2.getHostName(), INFO_DESCRIPTION_ENABLE_REPLICATION_HOST2));
        }
        else if (OPTION_LONG_PORT.equals(arg.getLongIdentifier()))
        {
@@ -8714,7 +8931,7 @@
          "usesecondserverasschemasource", null,
          "useSecondServerAsSchemaSource",
          INFO_DESCRIPTION_ENABLE_REPLICATION_USE_SECOND_AS_SCHEMA_SOURCE.get(
              "--"+argParser.noSchemaReplicationArg.getLongIdentifier())));
              "--" + argParser.noSchemaReplicationArg.getLongIdentifier())));
    }
  }
@@ -8794,7 +9011,7 @@
  }
  private void updateCommandBuilder(CommandBuilder commandBuilder,
      InitializeReplicationUserData uData)
      SourceDestinationServerUserData uData)
  throws ArgumentException
  {
    // Update the arguments used in the console interaction with the
@@ -9182,8 +9399,8 @@
      String hostPortDestination = getHostPort(ctxDestination);
      if (isInteractive())
      {
        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_CONFIRMATION.get(
            hostPortSource, hostPortDestination, hostPortSource, hostPortDestination);
        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_CONFIRMATION.get(hostPortSource,
            hostPortDestination, hostPortSource, hostPortDestination);
        if (!askConfirmation(msg, true))
        {
          throw new ReplicationCliException(ERR_REPLICATION_USER_CANCELLED.get(), USER_CANCELLED, null);
@@ -9191,8 +9408,8 @@
      }
      else
      {
        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_DESCRIPTION.get(
            hostPortSource, hostPortDestination, hostPortSource, hostPortDestination);
        LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_DESCRIPTION.get(hostPortSource,
            hostPortDestination, hostPortSource, hostPortDestination);
        println(msg);
        println();
      }
@@ -9234,8 +9451,7 @@
        if (!commonRepServerIDErrors.isEmpty())
        {
          mb.append(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID.get(
            getMessageFromCollection(commonRepServerIDErrors,
                Constants.LINE_SEPARATOR)));
              getMessageFromCollection(commonRepServerIDErrors, Constants.LINE_SEPARATOR)));
        }
        if (!commonDomainIDErrors.isEmpty())
        {
@@ -9244,8 +9460,7 @@
            mb.append(Constants.LINE_SEPARATOR);
          }
          mb.append(ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID.get(
            getMessageFromCollection(commonDomainIDErrors,
                Constants.LINE_SEPARATOR)));
              getMessageFromCollection(commonDomainIDErrors, Constants.LINE_SEPARATOR)));
        }
        throw new ReplicationCliException(mb.toMessage(),
            REPLICATION_ADS_MERGE_NOT_SUPPORTED, null);
@@ -9317,8 +9532,8 @@
      catch (Throwable t)
      {
        logger.error(LocalizableMessage.raw("Error seeding truststore: "+t, t));
        LocalizableMessage msg = ERR_REPLICATION_ENABLE_SEEDING_TRUSTSTORE.get(
            getHostPort(adsCtx2.getDirContext()), getHostPort(adsCtx1.getDirContext()), toString(t));
        LocalizableMessage msg = ERR_REPLICATION_ENABLE_SEEDING_TRUSTSTORE.get(getHostPort(adsCtx2.getDirContext()),
            getHostPort(adsCtx1.getDirContext()), toString(t));
        throw new ReplicationCliException(msg, ERROR_SEEDING_TRUSTORE, t);
      }
      pointAdder.stop();
@@ -9375,8 +9590,8 @@
      if (server2.isReplicationServer() && server2.getReplicationServerId() == replicationID1
          && !server2.getReplicationServerHostPort().equalsIgnoreCase(replServerHostPort1))
      {
        commonRepServerIDErrors.add(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID_ARG.get(serverToFind
            .getHostPort(true), server2.getHostPort(true), replicationID1));
        commonRepServerIDErrors.add(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID_ARG.get(
            serverToFind.getHostPort(true), server2.getHostPort(true), replicationID1));
        return true;
      }
    }
@@ -9399,12 +9614,8 @@
          && domain1Id == replica2.getReplicationId())
      {
        commonDomainIDErrors.add(
            ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID_ARG.get(
                replica1.getServer().getHostPort(true),
                suffix1DN,
                replica2.getServer().getHostPort(true),
                suffix2.getDN(),
                domain1Id));
            ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID_ARG.get(replica1.getServer().getHostPort(true), suffix1DN,
                replica2.getServer().getHostPort(true), suffix2.getDN(), domain1Id));
        return true;
      }
    }
@@ -9507,11 +9718,8 @@
  private boolean displayLogFileAtEnd(String subCommand)
  {
    final List<String> subCommands = Arrays.asList(
        ENABLE_REPLICATION_SUBCMD_NAME,
        DISABLE_REPLICATION_SUBCMD_NAME,
        INITIALIZE_ALL_REPLICATION_SUBCMD_NAME,
        INITIALIZE_REPLICATION_SUBCMD_NAME);
    final List<String> subCommands = Arrays.asList(ENABLE_REPLICATION_SUBCMD_NAME, DISABLE_REPLICATION_SUBCMD_NAME,
        INITIALIZE_ALL_REPLICATION_SUBCMD_NAME, INITIALIZE_REPLICATION_SUBCMD_NAME, RESET_CHANGE_NUMBER_SUBCMD_NAME);
    return subCommands.contains(subCommand);
  }