From 6b3ef14a652f6be0d559365d2fd2c78a61524fec Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 17 Sep 2010 22:06:25 +0000
Subject: [PATCH] Minimize Historical Data (dsreplication/client side). The purge historical can be executed on the local server even when it is stopped. This is matches the functionality provided by utilities such import-ldif, backup, etc.

---
 opends/src/server/org/opends/server/tools/dsreplication/ReplicationCliMain.java | 1087 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 1,037 insertions(+), 50 deletions(-)

diff --git a/opends/src/server/org/opends/server/tools/dsreplication/ReplicationCliMain.java b/opends/src/server/org/opends/server/tools/dsreplication/ReplicationCliMain.java
index 30b07cf..addd00d 100644
--- a/opends/src/server/org/opends/server/tools/dsreplication/ReplicationCliMain.java
+++ b/opends/src/server/org/opends/server/tools/dsreplication/ReplicationCliMain.java
@@ -49,6 +49,7 @@
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
@@ -56,6 +57,7 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -90,11 +92,18 @@
 import org.opends.admin.ads.util.ConnectionUtils;
 import org.opends.admin.ads.util.PreferredConnection;
 import org.opends.admin.ads.util.ServerLoader;
+import org.opends.guitools.controlpanel.datamodel.BackendDescriptor;
+import org.opends.guitools.controlpanel.datamodel.BaseDNDescriptor;
+import org.opends.guitools.controlpanel.util.ConfigFromDirContext;
+import org.opends.guitools.controlpanel.util.ConfigFromFile;
 import org.opends.guitools.controlpanel.util.ControlPanelLog;
+import org.opends.guitools.controlpanel.util.ProcessReader;
+import org.opends.guitools.controlpanel.util.Utilities;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.quicksetup.ApplicationException;
 import org.opends.quicksetup.Constants;
+import org.opends.quicksetup.Installation;
 import org.opends.quicksetup.ReturnCode;
 import org.opends.quicksetup.event.ProgressUpdateEvent;
 import org.opends.quicksetup.event.ProgressUpdateListener;
@@ -116,14 +125,19 @@
 import org.opends.server.admin.std.meta.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.tasks.PurgeConflictsHistoricalTask;
 import org.opends.server.tools.ClientException;
 import org.opends.server.tools.ToolConstants;
+import org.opends.server.tools.tasks.TaskEntry;
+import org.opends.server.tools.tasks.TaskScheduleInteraction;
+import org.opends.server.tools.tasks.TaskScheduleUserData;
 import org.opends.server.types.DN;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.NullOutputStream;
 import org.opends.server.types.OpenDsException;
 import org.opends.server.util.ServerConstants;
 import org.opends.server.util.SetupUtils;
+import org.opends.server.util.StaticUtils;
 import org.opends.server.util.args.Argument;
 import org.opends.server.util.args.ArgumentException;
 import org.opends.server.util.args.BooleanArgument;
@@ -136,6 +150,7 @@
 import org.opends.server.util.cli.LDAPConnectionConsoleInteraction;
 import org.opends.server.util.cli.MenuBuilder;
 import org.opends.server.util.cli.MenuResult;
+import org.opends.server.util.cli.PointAdder;
 import org.opends.server.util.table.TableBuilder;
 import org.opends.server.util.table.TextTablePrinter;
 
@@ -158,6 +173,20 @@
   /** Suffix for log files. */
   static public final String LOG_FILE_SUFFIX = ".log";
 
+  /**
+   * Property used to call the dsreplication script and ReplicationCliMain to
+   * know which are the java properties to be used (those of dsreplication or
+   * those of dsreplication.offline).
+   */
+  private static final String SCRIPT_CALL_STATUS =
+    "org.opends.server.dsreplicationcallstatus";
+
+  /**
+   * The value set by the dsreplication script if it is called the first time.
+   */
+  private static final String FIRST_SCRIPT_CALL =
+    "firstcall";
+
   private boolean forceNonInteractive;
 
   private static final Logger LOG =
@@ -204,6 +233,10 @@
      */
     STATUS(INFO_REPLICATION_STATUS_MENU_PROMPT.get()),
     /**
+     * Replication purge historical.
+     */
+    PURGE_HISTORICAL(INFO_REPLICATION_PURGE_HISTORICAL_MENU_PROMPT.get()),
+    /**
      * Cancel operation.
      */
     CANCEL(null);
@@ -389,7 +422,7 @@
           returnValue = ERROR_USER_DATA;
         }
       }
-      if (initializeServer)
+      if (initializeServer && returnValue == SUCCESSFUL_NOP)
       {
         DirectoryServer.bootstrapClient();
 
@@ -484,6 +517,12 @@
           subCommand =
             ReplicationCliArgumentParser.STATUS_REPLICATION_SUBCMD_NAME;
         }
+        else if (argParser.isPurgeHistoricalSubcommand())
+        {
+          returnValue = purgeHistorical();
+          subCommand =
+            ReplicationCliArgumentParser.PURGE_HISTORICAL_SUBCMD_NAME;
+        }
         else
         {
           if (argParser.isInteractive())
@@ -526,6 +565,11 @@
                 ReplicationCliArgumentParser.STATUS_REPLICATION_SUBCMD_NAME;
               break;
 
+            case PURGE_HISTORICAL:
+              subCommand =
+                ReplicationCliArgumentParser.PURGE_HISTORICAL_SUBCMD_NAME;
+              break;
+
             default:
               // User canceled
               returnValue = USER_CANCELLED;
@@ -573,6 +617,11 @@
     return returnValue.getReturnCode();
   }
 
+  private boolean isFirstCallFromScript()
+  {
+    return FIRST_SCRIPT_CALL.equals(System.getProperty(SCRIPT_CALL_STATUS));
+  }
+
   private void createArgumenParser() throws ArgumentException
   {
     argParser = new ReplicationCliArgumentParser(CLASS_NAME);
@@ -783,6 +832,663 @@
   }
 
   /**
+   * Based on the data provided in the command-line it displays replication
+   * status.
+   * @return the error code if the operation failed and SUCCESSFUL if it was
+   * successful.
+   */
+  private ReplicationCliReturnCode purgeHistorical()
+  {
+    ReplicationCliReturnCode returnValue;
+    PurgeHistoricalUserData uData = new PurgeHistoricalUserData();
+
+    if (argParser.isInteractive())
+    {
+      uData = new PurgeHistoricalUserData();
+      if (promptIfRequired(uData))
+      {
+        returnValue = purgeHistorical(uData);
+      }
+      else
+      {
+        returnValue = USER_CANCELLED;
+      }
+    }
+    else
+    {
+      initializeWithArgParser(uData);
+      returnValue = purgeHistorical(uData);
+    }
+    return returnValue;
+  }
+
+  /**
+   * Initializes the contents of the provided purge historical replication user
+   * data object with what was provided in the command-line without prompting to
+   * the user.
+   * @param uData the purge historical replication user data object to be
+   * initialized.
+   */
+  private void initializeWithArgParser(PurgeHistoricalUserData uData)
+  {
+    PurgeHistoricalUserData.initializeWithArgParser(uData, argParser);
+  }
+
+  private ReplicationCliReturnCode purgeHistorical(
+      PurgeHistoricalUserData uData)
+  {
+      ReplicationCliReturnCode returnValue = null;
+      if (uData.isOnline())
+      {
+        returnValue = purgeHistoricalRemotely(uData);
+      }
+      else
+      {
+        returnValue = purgeHistoricalLocally(uData);
+      }
+      return returnValue;
+  }
+
+  private ReplicationCliReturnCode purgeHistoricalLocally(
+      PurgeHistoricalUserData uData)
+  {
+    ReplicationCliReturnCode returnValue;
+    LinkedList<String> baseDNs = uData.getBaseDNs();
+    checkSuffixesForLocalPurgeHistorical(baseDNs, false);
+    if (!baseDNs.isEmpty())
+    {
+      uData.setBaseDNs(baseDNs);
+      printPurgeHistoricalEquivalentIfRequired(uData);
+
+      returnValue = SUCCESSFUL;
+      try
+      {
+        returnValue = purgeHistoricalLocallyTask(uData);
+      }
+      catch (ReplicationCliException rce)
+      {
+        println();
+        println(getCriticalExceptionMessage(rce));
+        returnValue = rce.getErrorCode();
+        LOG.log(Level.SEVERE, "Complete error stack:", rce);
+      }
+    }
+    else
+    {
+      returnValue = HISTORICAL_CANNOT_BE_PURGED_ON_BASEDN;
+    }
+
+    return returnValue;
+  }
+
+  private void printPurgeProgressMessage(PurgeHistoricalUserData uData)
+  {
+    String separator =  formatter.getLineBreak().toString() +
+    formatter.getTab().toString();
+    printlnProgress();
+    Message msg = formatter.getFormattedProgress(
+        INFO_PROGRESS_PURGE_HISTORICAL.get(separator,
+            Utils.getStringFromCollection(uData.getBaseDNs(), separator)));
+    printProgress(msg);
+    printlnProgress();
+
+  }
+
+  private ReplicationCliReturnCode purgeHistoricalLocallyTask(
+      PurgeHistoricalUserData uData)
+  throws ReplicationCliException
+  {
+    ReplicationCliReturnCode returnCode = ReplicationCliReturnCode.SUCCESSFUL;
+    if (isFirstCallFromScript())
+    {
+      // Launch the process: launch dsreplication in non-interactive mode with
+      // the recursive property set.
+      ArrayList<String> args = new ArrayList<String>();
+      args.add(getCommandLinePath(getCommandName()));
+      args.add(ReplicationCliArgumentParser.PURGE_HISTORICAL_SUBCMD_NAME);
+      args.add("--"+argParser.noPromptArg.getLongIdentifier());
+      args.add("--"+argParser.maximumDurationArg.getLongIdentifier());
+      args.add(String.valueOf(uData.getMaximumDuration()));
+      for (String baseDN : uData.getBaseDNs())
+      {
+        args.add("--"+argParser.baseDNsArg.getLongIdentifier());
+        args.add(baseDN);
+      }
+      ProcessBuilder pb = new ProcessBuilder(args);
+      // Use the java args in the script.
+      Map<String, String> env = pb.environment();
+      env.put("RECURSIVE_LOCAL_CALL", "true");
+      try
+      {
+        ProcessReader outReader = null;
+        ProcessReader errReader = null;
+        Process process = pb.start();
+        outReader = new ProcessReader(process, getOutputStream(), false);
+        errReader = new ProcessReader(process, getErrorStream(), true);
+
+        outReader.startReading();
+        errReader.startReading();
+
+        int code = process.waitFor();
+        for (ReplicationCliReturnCode c : ReplicationCliReturnCode.values())
+        {
+          if (c.getReturnCode() == code)
+          {
+            returnCode = c;
+            break;
+          }
+        }
+      }
+      catch (Exception e)
+      {
+        Message msg = ERR_LAUNCHING_PURGE_HISTORICAL.get();
+        ReplicationCliReturnCode code = ERROR_LAUNCHING_PURGE_HISTORICAL;
+        throw new ReplicationCliException(
+            getThrowableMsg(msg, e), code, e);
+      }
+    }
+    else
+    {
+      printPurgeProgressMessage(uData);
+      LocalPurgeHistorical localPurgeHistorical =
+        new LocalPurgeHistorical(uData, this, formatter,
+            argParser.getConfigFile(),
+            argParser.getConfigClass());
+      returnCode = localPurgeHistorical.execute();
+
+      if (returnCode == ReplicationCliReturnCode.SUCCESSFUL)
+      {
+        printSuccessMessage(uData, null);
+      }
+    }
+    return returnCode;
+  }
+
+  private void printPurgeHistoricalEquivalentIfRequired(
+      PurgeHistoricalUserData uData)
+  {
+    if (mustPrintCommandBuilder())
+    {
+      try
+      {
+        CommandBuilder commandBuilder = createCommandBuilder(
+            ReplicationCliArgumentParser.PURGE_HISTORICAL_SUBCMD_NAME,
+            uData);
+        printCommandBuilder(commandBuilder);
+      }
+      catch (Throwable t)
+      {
+        LOG.log(Level.SEVERE, "Error printing equivalente command-line: "+t,
+            t);
+      }
+    }
+  }
+
+  private ReplicationCliReturnCode purgeHistoricalRemotely(
+      PurgeHistoricalUserData uData)
+  {
+    ReplicationCliReturnCode returnValue = SUCCESSFUL_NOP;
+    InitialLdapContext ctx = null;
+
+    // Connect to the provided server
+    try
+    {
+      ctx = createAdministrativeContext(uData.getHostName(), uData.getPort(),
+          useSSL, useStartTLS,
+          ADSContext.getAdministratorDN(uData.getAdminUid()),
+          uData.getAdminPwd(), getConnectTimeout(), getTrustManager());
+    }
+    catch (NamingException ne)
+    {
+      String hostPort =
+        getServerRepresentation(uData.getHostName(), uData.getPort());
+      println();
+      println(getMessageForException(ne, hostPort));
+      LOG.log(Level.SEVERE, "Complete error stack:", ne);
+    }
+
+    if (ctx != null)
+    {
+      LinkedList<String> baseDNs = uData.getBaseDNs();
+      checkSuffixesForPurgeHistorical(baseDNs, ctx, false);
+      if (!baseDNs.isEmpty())
+      {
+        uData.setBaseDNs(baseDNs);
+        printPurgeHistoricalEquivalentIfRequired(uData);
+
+        returnValue = SUCCESSFUL;
+        try
+        {
+          returnValue = purgeHistoricalRemoteTask(ctx, uData);
+        }
+        catch (ReplicationCliException rce)
+        {
+          println();
+          println(getCriticalExceptionMessage(rce));
+          returnValue = rce.getErrorCode();
+          LOG.log(Level.SEVERE, "Complete error stack:", rce);
+        }
+      }
+      else
+      {
+        returnValue = HISTORICAL_CANNOT_BE_PURGED_ON_BASEDN;
+      }
+    }
+    else
+    {
+      returnValue = ERROR_CONNECTING;
+    }
+
+    if (ctx != null)
+    {
+      try
+      {
+        ctx.close();
+      }
+      catch (Throwable t)
+      {
+      }
+    }
+    return returnValue;
+  }
+
+  private void printSuccessMessage(PurgeHistoricalUserData uData, String taskID)
+  {
+    printlnProgress();
+    if (!uData.isOnline())
+    {
+      printProgress(
+          INFO_PROGRESS_PURGE_HISTORICAL_FINISHED_PROCEDURE.get());
+    }
+    else if (uData.getTaskSchedule().isStartNow())
+    {
+      printProgress(INFO_TASK_TOOL_TASK_SUCESSFULL.get(
+          INFO_PURGE_HISTORICAL_TASK_NAME.get(),
+          taskID));
+    }
+    else if (uData.getTaskSchedule().getStartDate() != null)
+    {
+      printProgress(INFO_TASK_TOOL_TASK_SCHEDULED_FUTURE.get(
+          INFO_PURGE_HISTORICAL_TASK_NAME.get(),
+          taskID,
+          StaticUtils.formatDateTimeString(
+              uData.getTaskSchedule().getStartDate())));
+    }
+    else
+    {
+      printProgress(INFO_TASK_TOOL_RECURRING_TASK_SCHEDULED.get(
+          INFO_PURGE_HISTORICAL_TASK_NAME.get(),
+          taskID));
+    }
+
+    printlnProgress();
+  }
+
+  /**
+   * Launches the purge historical operation using the
+   * provided connection.
+   * @param ctx the connection to the server.
+   * @throws ReplicationCliException if there is an error performing the
+   * operation.
+   */
+  private ReplicationCliReturnCode purgeHistoricalRemoteTask(
+      InitialLdapContext ctx,
+      PurgeHistoricalUserData uData)
+  throws ReplicationCliException
+  {
+    printPurgeProgressMessage(uData);
+    ReplicationCliReturnCode returnCode = ReplicationCliReturnCode.SUCCESSFUL;
+    boolean taskCreated = false;
+    int i = 0;
+    boolean isOver = false;
+    String dn = null;
+    String taskID = null;
+    while (!taskCreated)
+    {
+      BasicAttributes attrs = PurgeHistoricalUserData.getTaskAttributes(uData);
+      dn = PurgeHistoricalUserData.getTaskDN(attrs);
+      taskID = PurgeHistoricalUserData.getTaskID(attrs);
+      try
+      {
+        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
+        taskCreated = true;
+        LOG.log(Level.INFO, "created task entry: "+attrs);
+        dirCtx.close();
+      }
+      catch (NameAlreadyBoundException x)
+      {
+      }
+      catch (NamingException ne)
+      {
+        LOG.log(Level.SEVERE, "Error creating task "+attrs, ne);
+        Message msg = ERR_LAUNCHING_PURGE_HISTORICAL.get();
+        ReplicationCliReturnCode code = ERROR_LAUNCHING_PURGE_HISTORICAL;
+        throw new ReplicationCliException(
+            getThrowableMsg(msg, ne), code, ne);
+      }
+      i++;
+    }
+    // Wait until it is over
+    SearchControls searchControls = new SearchControls();
+    searchControls.setCountLimit(1);
+    searchControls.setSearchScope(
+        SearchControls. OBJECT_SCOPE);
+    String filter = "objectclass=*";
+    searchControls.setReturningAttributes(
+        new String[] {
+            "ds-task-log-message",
+            "ds-task-state",
+            "ds-task-purge-conflicts-historical-purged-values-count",
+            "ds-task-purge-conflicts-historical-purge-completed-in-time",
+            "ds-task-purge-conflicts-historical-purge-completed-in-time",
+            "ds-task-purge-conflicts-historical-last-purged-changenumber"
+        });
+    String lastLogMsg = null;
+
+    // Polling only makes sense when we are recurrently scheduling a task
+    // or the task is being executed now.
+    while (!isOver && (uData.getTaskSchedule().getStartDate() == null))
+    {
+      try
+      {
+        Thread.sleep(500);
+      }
+      catch (Throwable t)
+      {
+      }
+      try
+      {
+        NamingEnumeration<SearchResult> res =
+          ctx.search(dn, filter, searchControls);
+        SearchResult sr = null;
+        try
+        {
+          sr = res.next();
+        }
+        finally
+        {
+          res.close();
+        }
+        String logMsg = getFirstValue(sr, "ds-task-log-message");
+        if (logMsg != null)
+        {
+          if (!logMsg.equals(lastLogMsg))
+          {
+            LOG.log(Level.INFO, logMsg);
+            lastLogMsg = logMsg;
+          }
+        }
+        InstallerHelper helper = new InstallerHelper();
+        String state = getFirstValue(sr, "ds-task-state");
+
+        if (helper.isDone(state) || helper.isStoppedByError(state))
+        {
+          isOver = true;
+          Message errorMsg;
+          String server = ConnectionUtils.getHostPort(ctx);
+          if (lastLogMsg == null)
+          {
+            errorMsg = INFO_ERROR_DURING_PURGE_HISTORICAL_NO_LOG.get(
+                state, server);
+          }
+          else
+          {
+            errorMsg = INFO_ERROR_DURING_PURGE_HISTORICAL_LOG.get(
+                lastLogMsg, state, server);
+          }
+
+          if (helper.isCompletedWithErrors(state))
+          {
+            LOG.log(Level.WARNING, "Completed with error: "+errorMsg);
+            println(errorMsg);
+          }
+          else if (!helper.isSuccessful(state) ||
+              helper.isStoppedByError(state))
+          {
+            LOG.log(Level.WARNING, "Error: "+errorMsg);
+            ReplicationCliReturnCode code = ERROR_LAUNCHING_PURGE_HISTORICAL;
+            throw new ReplicationCliException(errorMsg, code, null);
+          }
+        }
+      }
+      catch (NameNotFoundException x)
+      {
+        isOver = true;
+      }
+      catch (NamingException ne)
+      {
+        Message msg = ERR_POOLING_PURGE_HISTORICAL.get();
+        throw new ReplicationCliException(
+          getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
+      }
+    }
+
+    if (returnCode == ReplicationCliReturnCode.SUCCESSFUL)
+    {
+      printSuccessMessage(uData, taskID);
+    }
+    return returnCode;
+  }
+
+  /**
+   * Checks that historical can actually be purged in the provided baseDNs
+   * for the server.
+   * @param suffixes the suffixes provided by the user.  This Collection is
+   * updated with the base DNs that the user provided interactively.
+   * @param ctx connection to the server.
+   * @param interactive whether to ask the user to provide interactively
+   * base DNs if none of the provided base DNs can be purged.
+   */
+  private void checkSuffixesForPurgeHistorical(Collection<String> suffixes,
+      InitialLdapContext ctx, boolean interactive)
+  {
+    TreeSet<String> availableSuffixes = new TreeSet<String>();
+    TreeSet<String> notReplicatedSuffixes = new TreeSet<String>();
+
+    Collection<ReplicaDescriptor> replicas = getReplicas(ctx);
+    for (ReplicaDescriptor rep : replicas)
+    {
+      String dn = rep.getSuffix().getDN();
+      if (rep.isReplicated())
+      {
+        availableSuffixes.add(dn);
+      }
+      else
+      {
+        notReplicatedSuffixes.add(dn);
+      }
+    }
+
+    checkSuffixesForPurgeHistorical(suffixes, availableSuffixes,
+        notReplicatedSuffixes, interactive);
+  }
+
+  /**
+   * Checks that historical can actually be purged in the provided baseDNs
+   * for the local server.
+   * @param suffixes the suffixes provided by the user.  This Collection is
+   * updated with the base DNs that the user provided interactively.
+   * @param interactive whether to ask the user to provide interactively
+   * base DNs if none of the provided base DNs can be purged.
+   */
+  private void checkSuffixesForLocalPurgeHistorical(Collection<String> suffixes,
+      boolean interactive)
+  {
+    TreeSet<String> availableSuffixes = new TreeSet<String>();
+    TreeSet<String> notReplicatedSuffixes = new TreeSet<String>();
+
+    Collection<ReplicaDescriptor> replicas = getLocalReplicas();
+
+    for (ReplicaDescriptor rep : replicas)
+    {
+      String dn = rep.getSuffix().getDN();
+      if (rep.isReplicated())
+      {
+        availableSuffixes.add(dn);
+      }
+      else
+      {
+        notReplicatedSuffixes.add(dn);
+      }
+    }
+
+    checkSuffixesForPurgeHistorical(suffixes, availableSuffixes,
+        notReplicatedSuffixes, interactive);
+  }
+
+  private Collection<ReplicaDescriptor> getLocalReplicas()
+  {
+    Collection<ReplicaDescriptor> replicas = new ArrayList<ReplicaDescriptor>();
+    ConfigFromFile configFromFile = new ConfigFromFile();
+    configFromFile.readConfiguration();
+    Collection<BackendDescriptor> backends = configFromFile.getBackends();
+    for (BackendDescriptor backend : backends)
+    {
+      for (BaseDNDescriptor baseDN : backend.getBaseDns())
+      {
+        SuffixDescriptor suffix = new SuffixDescriptor();
+        suffix.setDN(baseDN.getDn().toString());
+
+        ReplicaDescriptor replica = new ReplicaDescriptor();
+
+        if (baseDN.getType() == BaseDNDescriptor.Type.REPLICATED)
+        {
+          replica.setReplicationId(baseDN.getReplicaID());
+        }
+        else
+        {
+          replica.setReplicationId(-1);
+        }
+        replica.setBackendName(backend.getBackendID());
+        replica.setSuffix(suffix);
+        suffix.setReplicas(Collections.singleton(replica));
+
+        replicas.add(replica);
+      }
+    }
+    return replicas;
+  }
+
+  private void checkSuffixesForPurgeHistorical(Collection<String> suffixes,
+      Collection<String> availableSuffixes,
+      Collection<String> notReplicatedSuffixes,
+      boolean interactive)
+  {
+    if (availableSuffixes.size() == 0)
+    {
+      println();
+      println(ERR_NO_SUFFIXES_AVAILABLE_TO_PURGE_HISTORICAL.get());
+      suffixes.clear();
+    }
+    else
+    {
+      // Verify that the provided suffixes are configured in the servers.
+      TreeSet<String> notFound = new TreeSet<String>();
+      TreeSet<String> alreadyNotReplicated = new TreeSet<String>();
+      for (String dn : suffixes)
+      {
+        boolean found = false;
+        for (String dn1 : availableSuffixes)
+        {
+          if (Utils.areDnsEqual(dn, dn1))
+          {
+            found = true;
+            break;
+          }
+        }
+        if (!found)
+        {
+          boolean notReplicated = false;
+          for (String s : notReplicatedSuffixes)
+          {
+            if (Utils.areDnsEqual(s, dn))
+            {
+              notReplicated = true;
+              break;
+            }
+          }
+          if (notReplicated)
+          {
+            alreadyNotReplicated.add(dn);
+          }
+          else
+          {
+            notFound.add(dn);
+          }
+        }
+      }
+      suffixes.removeAll(notFound);
+      suffixes.removeAll(alreadyNotReplicated);
+      if (notFound.size() > 0)
+      {
+        println();
+        println(ERR_REPLICATION_PURGE_SUFFIXES_NOT_FOUND.get(
+                Utils.getStringFromCollection(notFound,
+                    Constants.LINE_SEPARATOR)));
+      }
+      if (interactive)
+      {
+        boolean confirmationLimitReached = false;
+        while (suffixes.isEmpty())
+        {
+          boolean noSchemaOrAds = false;
+          for (String s: availableSuffixes)
+          {
+            if (!Utils.areDnsEqual(s, ADSContext.getAdministrationSuffixDN()) &&
+                !Utils.areDnsEqual(s, Constants.SCHEMA_DN) &&
+                !Utils.areDnsEqual(s, Constants.REPLICATION_CHANGES_DN))
+            {
+              noSchemaOrAds = true;
+            }
+          }
+          if (!noSchemaOrAds)
+          {
+            // In interactive mode we do not propose to manage the
+            // administration suffix.
+            println();
+            println(ERR_NO_SUFFIXES_AVAILABLE_TO_PURGE_HISTORICAL.get());
+            break;
+          }
+          else
+          {
+            println();
+            println(ERR_NO_SUFFIXES_SELECTED_TO_PURGE_HISTORICAL.get());
+            for (String dn : availableSuffixes)
+            {
+              if (!Utils.areDnsEqual(dn,
+                  ADSContext.getAdministrationSuffixDN()) &&
+                  !Utils.areDnsEqual(dn, Constants.SCHEMA_DN) &&
+                  !Utils.areDnsEqual(dn, Constants.REPLICATION_CHANGES_DN))
+              {
+                try
+                {
+                  if (askConfirmation(
+                      INFO_REPLICATION_PURGE_HISTORICAL_PROMPT.get(dn), true,
+                      LOG))
+                  {
+                    suffixes.add(dn);
+                  }
+                }
+                catch (CLIException ce)
+                {
+                  println(ce.getMessageObject());
+                  confirmationLimitReached = true;
+                  break;
+                }
+              }
+            }
+          }
+          if (confirmationLimitReached)
+          {
+            suffixes.clear();
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Based on the data provided in the command-line it initializes replication
    * between two servers.
    * @return the error code if the operation failed and SUCCESSFUL if it was
@@ -811,6 +1517,186 @@
     return returnValue;
   }
 
+
+  /**
+   * Updates the contents of the provided PurgeHistoricalUserData
+   * object with the information provided in the command-line.  If some
+   * information is missing, ask the user to provide valid data.
+   * We assume that if this method is called we are in interactive mode.
+   * @param uData the object to be updated.
+   * @return <CODE>true</CODE> if the object was successfully updated and
+   * <CODE>false</CODE> if the user canceled the operation.
+   */
+  private boolean promptIfRequired(PurgeHistoricalUserData uData)
+  {
+    boolean cancelled = false;
+    boolean onlineSet = false;
+
+    boolean firstTry = true;
+    Boolean serverRunning = null;
+
+    InitialLdapContext ctx = null;
+    while (!cancelled && !onlineSet)
+    {
+      boolean promptForConnection = false;
+      if (argParser.connectionArgumentsPresent() && firstTry)
+      {
+        promptForConnection = true;
+      }
+      else
+      {
+        if (serverRunning == null)
+        {
+          serverRunning = Utilities.isServerRunning(
+              Installation.getLocal().getInstanceDirectory());
+        }
+        if (!serverRunning)
+        {
+          try
+          {
+            printlnProgress();
+            promptForConnection =
+              !askConfirmation(
+                  INFO_REPLICATION_PURGE_HISTORICAL_LOCAL_PROMPT.get(),
+                  true, LOG);
+          }
+          catch (CLIException ce)
+          {
+            println(ce.getMessageObject());
+            cancelled = true;
+          }
+        }
+        else
+        {
+          promptForConnection = true;
+        }
+      }
+      if (promptForConnection)
+      {
+        try
+        {
+          ci.run();
+          String host = ci.getHostName();
+          int port = ci.getPortNumber();
+          String adminUid = ci.getAdministratorUID();
+          String adminPwd = ci.getBindPassword();
+
+          ctx = createInitialLdapContextInteracting(ci);
+          if (ctx == null)
+          {
+            cancelled = true;
+          }
+          else
+          {
+            uData.setOnline(true);
+            uData.setAdminUid(adminUid);
+            uData.setAdminPwd(adminPwd);
+            uData.setHostName(host);
+            uData.setPort(port);
+            onlineSet = true;
+          }
+        }
+        catch (ClientException ce)
+        {
+          LOG.log(Level.WARNING, "Client exception "+ce);
+          println();
+          println(ce.getMessageObject());
+          println();
+          ci.resetConnectionArguments();
+        }
+        catch (ArgumentException ae)
+        {
+          LOG.log(Level.WARNING, "Argument exception "+ae);
+          println();
+          println(ae.getMessageObject());
+          println();
+          cancelled = true;
+        }
+      }
+      else
+      {
+        uData.setOnline(false);
+        onlineSet = true;
+      }
+      firstTry = false;
+    }
+
+    if (!cancelled)
+    {
+      int maximumDuration = argParser.getMaximumDuration();
+      /* Prompt for maximum duration */
+      if (!argParser.maximumDurationArg.isPresent())
+      {
+        printlnProgress();
+        maximumDuration = askInteger(
+            INFO_REPLICATION_PURGE_HISTORICAL_MAXIMUM_DURATION_PROMPT.get(),
+            argParser.getDefaultMaximumDuration(), LOG);
+      }
+      uData.setMaximumDuration(maximumDuration);
+    }
+
+    if (!cancelled)
+    {
+      LinkedList<String> suffixes = argParser.getBaseDNs();
+      if (uData.isOnline())
+      {
+        checkSuffixesForPurgeHistorical(suffixes, ctx, true);
+      }
+      else
+      {
+        checkSuffixesForLocalPurgeHistorical(suffixes, true);
+      }
+      cancelled = suffixes.isEmpty();
+      uData.setBaseDNs(suffixes);
+    }
+
+    if (uData.isOnline() && !cancelled)
+    {
+      List<? extends TaskEntry> taskEntries = getAvailableTaskEntries(ctx);
+
+      TaskScheduleInteraction interaction =
+        new TaskScheduleInteraction(uData.getTaskSchedule(), argParser.taskArgs,
+            this, formatter, taskEntries,
+            INFO_PURGE_HISTORICAL_TASK_NAME.get());
+      try
+      {
+        interaction.run();
+      }
+      catch (CLIException ce)
+      {
+        println(ce.getMessageObject());
+        cancelled = true;
+      }
+    }
+
+    if (ctx != null)
+    {
+      try
+      {
+        ctx.close();
+      }
+      catch (Throwable t)
+      {
+      }
+    }
+
+    return !cancelled;
+  }
+
+  private List<? extends TaskEntry> getAvailableTaskEntries(
+      InitialLdapContext ctx)
+  {
+    List<TaskEntry> taskEntries = new ArrayList<TaskEntry>();
+    List<OpenDsException> exceptions = new ArrayList<OpenDsException>();
+    ConfigFromDirContext cfg = new ConfigFromDirContext();
+    cfg.updateTaskInformation(ctx, exceptions, taskEntries);
+    for (OpenDsException ode : exceptions)
+    {
+      LOG.log(Level.WARNING, "Error retrieving task entries: "+ode, ode);
+    }
+    return taskEntries;
+  }
+
   /**
    * Updates the contents of the provided EnableReplicationUserData object
    * with the information provided in the command-line.  If some information
@@ -2734,7 +3620,7 @@
    * Initializes the contents of the provided status replication user data
    * object with what was provided in the command-line without prompting to the
    * user.
-   * @param uData the disable replication user data object to be initialized.
+   * @param uData the status replication user data object to be initialized.
    */
   private void initializeWithArgParser(StatusReplicationUserData uData)
   {
@@ -3525,7 +4411,7 @@
   }
 
   /**
-   * Disbles the replication in the server for the provided suffixes using the
+   * Disables the replication in the server for the provided suffixes using the
    * data in the DisableReplicationUserData object.  This method does not prompt
    * to the user for information if something is missing.
    * @param uData the DisableReplicationUserData object.
@@ -5560,7 +6446,7 @@
     // done).
     if (adsMergeDone)
     {
-      PointAdder pointAdder = new PointAdder();
+      PointAdder pointAdder = new PointAdder(this);
       printProgress(
           INFO_ENABLE_REPLICATION_INITIALIZING_ADS_ALL.get(
               ConnectionUtils.getHostPort(ctxSource)));
@@ -5606,7 +6492,7 @@
       }
       if (adsMergeDone)
       {
-        PointAdder pointAdder = new PointAdder();
+        PointAdder pointAdder = new PointAdder(this);
         printProgress(
             INFO_ENABLE_REPLICATION_INITIALIZING_SCHEMA.get(
                 ConnectionUtils.getHostPort(ctxDestination),
@@ -8968,12 +9854,7 @@
   private CommandBuilder createCommandBuilder(String subcommandName,
       ReplicationUserData uData) throws ArgumentException
   {
-    String commandName =
-      System.getProperty(ServerConstants.PROPERTY_SCRIPT_NAME);
-    if (commandName == null)
-    {
-      commandName = "dsreplication";
-    }
+    String commandName = getCommandName();
 
     CommandBuilder commandBuilder =
       new CommandBuilder(commandName, subcommandName);
@@ -8992,48 +9873,17 @@
       updateCommandBuilder(commandBuilder,
           (InitializeReplicationUserData)uData);
     }
+    else if (subcommandName.equals(
+        ReplicationCliArgumentParser.PURGE_HISTORICAL_SUBCMD_NAME))
+    {
+      // All the arguments for initialize replication are update here.
+      updateCommandBuilder(commandBuilder, (PurgeHistoricalUserData)uData);
+    }
     else
     {
       // Update the arguments used in the console interaction with the
       // actual arguments of dsreplication.
-      if ((ci != null) && (ci.getCommandBuilder() != null))
-      {
-        CommandBuilder interactionBuilder = ci.getCommandBuilder();
-        for (Argument arg : interactionBuilder.getArguments())
-        {
-          if (arg.getLongIdentifier().equals(OPTION_LONG_BINDPWD))
-          {
-            StringArgument bindPasswordArg = new StringArgument("adminPassword",
-                OPTION_SHORT_BINDPWD, "adminPassword", false, false, true,
-                INFO_BINDPWD_PLACEHOLDER.get(), null, null,
-                INFO_DESCRIPTION_REPLICATION_ADMIN_BINDPASSWORD.get());
-            bindPasswordArg.addValue(arg.getValue());
-            commandBuilder.addObfuscatedArgument(bindPasswordArg);
-          }
-          else if (arg.getLongIdentifier().equals(OPTION_LONG_BINDPWD_FILE))
-          {
-            FileBasedArgument bindPasswordFileArg = new FileBasedArgument(
-                "adminPasswordFile",
-                OPTION_SHORT_BINDPWD_FILE, "adminPasswordFile", false, false,
-                INFO_BINDPWD_FILE_PLACEHOLDER.get(), null, null,
-                INFO_DESCRIPTION_REPLICATION_ADMIN_BINDPASSWORDFILE.get());
-            bindPasswordFileArg.getNameToValueMap().putAll(
-                ((FileBasedArgument)arg).getNameToValueMap());
-            commandBuilder.addArgument(bindPasswordFileArg);
-          }
-          else
-          {
-            if (interactionBuilder.isObfuscated(arg))
-            {
-              commandBuilder.addObfuscatedArgument(arg);
-            }
-            else
-            {
-              commandBuilder.addArgument(arg);
-            }
-          }
-        }
-      }
+      updateCommandBuilderWithConsoleInteraction(commandBuilder, ci);
     }
 
     if (subcommandName.equals(
@@ -9063,6 +9913,97 @@
     return commandBuilder;
   }
 
+  private String getCommandName()
+  {
+    String commandName =
+      System.getProperty(ServerConstants.PROPERTY_SCRIPT_NAME);
+    if (commandName == null)
+    {
+      commandName = "dsreplication";
+    }
+    return commandName;
+  }
+
+  private void updateCommandBuilderWithConsoleInteraction(
+      CommandBuilder commandBuilder,
+      LDAPConnectionConsoleInteraction ci) throws ArgumentException
+  {
+    if ((ci != null) && (ci.getCommandBuilder() != null))
+    {
+      CommandBuilder interactionBuilder = ci.getCommandBuilder();
+      for (Argument arg : interactionBuilder.getArguments())
+      {
+        if (arg.getLongIdentifier().equals(OPTION_LONG_BINDPWD))
+        {
+          StringArgument bindPasswordArg = new StringArgument("adminPassword",
+              OPTION_SHORT_BINDPWD, "adminPassword", false, false, true,
+              INFO_BINDPWD_PLACEHOLDER.get(), null, null,
+              INFO_DESCRIPTION_REPLICATION_ADMIN_BINDPASSWORD.get());
+          bindPasswordArg.addValue(arg.getValue());
+          commandBuilder.addObfuscatedArgument(bindPasswordArg);
+        }
+        else if (arg.getLongIdentifier().equals(OPTION_LONG_BINDPWD_FILE))
+        {
+          FileBasedArgument bindPasswordFileArg = new FileBasedArgument(
+              "adminPasswordFile",
+              OPTION_SHORT_BINDPWD_FILE, "adminPasswordFile", false, false,
+              INFO_BINDPWD_FILE_PLACEHOLDER.get(), null, null,
+              INFO_DESCRIPTION_REPLICATION_ADMIN_BINDPASSWORDFILE.get());
+          bindPasswordFileArg.getNameToValueMap().putAll(
+              ((FileBasedArgument)arg).getNameToValueMap());
+          commandBuilder.addArgument(bindPasswordFileArg);
+        }
+        else
+        {
+          if (interactionBuilder.isObfuscated(arg))
+          {
+            commandBuilder.addObfuscatedArgument(arg);
+          }
+          else
+          {
+            commandBuilder.addArgument(arg);
+          }
+        }
+      }
+    }
+  }
+
+  private void updateCommandBuilder(CommandBuilder commandBuilder,
+      PurgeHistoricalUserData uData) throws ArgumentException
+  {
+    if (uData.isOnline())
+    {
+      updateCommandBuilderWithConsoleInteraction(commandBuilder, ci);
+      if (uData.getTaskSchedule() != null)
+      {
+        updateCommandBuilderWithTaskSchedule(commandBuilder,
+            uData.getTaskSchedule());
+      }
+    }
+
+    IntegerArgument maximumDurationArg = new IntegerArgument(
+        argParser.maximumDurationArg.getName(),
+        argParser.maximumDurationArg.getShortIdentifier(),
+        argParser.maximumDurationArg.getLongIdentifier(),
+        argParser.maximumDurationArg.isRequired(),
+        argParser.maximumDurationArg.isMultiValued(),
+        argParser.maximumDurationArg.needsValue(),
+        argParser.maximumDurationArg.getValuePlaceholder(),
+        PurgeConflictsHistoricalTask.DEFAULT_MAX_DURATION,
+        argParser.maximumDurationArg.getPropertyName(),
+        argParser.maximumDurationArg.getDescription());
+    maximumDurationArg.addValue(String.valueOf(uData.getMaximumDuration()));
+    commandBuilder.addArgument(maximumDurationArg);
+  }
+
+  private void updateCommandBuilderWithTaskSchedule(
+      CommandBuilder commandBuilder,
+      TaskScheduleUserData taskSchedule)
+  {
+    TaskScheduleUserData.updateCommandBuilderWithTaskSchedule(
+        commandBuilder, taskSchedule);
+  }
+
   private void addGlobalArguments(CommandBuilder commandBuilder,
       ReplicationUserData uData)
   throws ArgumentException
@@ -10064,7 +11005,7 @@
   private boolean mergeRegistries(ADSContext adsCtx1, ADSContext adsCtx2)
   throws ReplicationCliException
   {
-    PointAdder pointAdder = new PointAdder();
+    PointAdder pointAdder = new PointAdder(this);
     try
     {
       LinkedHashSet<PreferredConnection> cnx =
@@ -10516,6 +11457,52 @@
   {
     return argParser.getConnectTimeout();
   }
+
+  private String binDir;
+  /**
+   * Returns the binary/script directory.
+   * @return the binary/script directory.
+   */
+  private String getBinaryDir()
+  {
+    if (binDir == null)
+    {
+      File f = Installation.getLocal().getBinariesDirectory();
+      try
+      {
+        binDir = f.getCanonicalPath();
+      }
+      catch (Throwable t)
+      {
+        binDir = f.getAbsolutePath();
+      }
+      if (binDir.lastIndexOf(File.separatorChar) != (binDir.length() - 1))
+      {
+        binDir += File.separatorChar;
+      }
+    }
+
+    return binDir;
+  }
+
+  /**
+   * Returns the full path of the command-line for a given script name.
+   * @param scriptBasicName the script basic name (with no extension).
+   * @return the full path of the command-line for a given script name.
+   */
+  private String getCommandLinePath(String scriptBasicName)
+  {
+    String cmdLineName;
+    if (Utilities.isWindows())
+    {
+      cmdLineName = getBinaryDir()+scriptBasicName+".bat";
+    }
+    else
+    {
+      cmdLineName = getBinaryDir()+scriptBasicName;
+    }
+    return cmdLineName;
+  }
 }
 
 

--
Gitblit v1.10.0