From 7067ad3cb534be96af07817eb5e9e270ae6efcd2 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Wed, 18 Aug 2010 13:27:32 +0000
Subject: [PATCH] First pass on solving issue #514, reducing Replication meta data, especially aged information. The default is to keep the replication meta data (historical information) for at least 1 day. Purging occurs on the fly when entries are modified, or via a task. Launching the task will be available through dsreplication in separate commit. 

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |  169 ++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 158 insertions(+), 11 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 7ad8093..f4bc7a4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -24,6 +24,7 @@
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
+
 package org.opends.server.replication.plugin;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -62,9 +63,13 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
+import java.util.Date;
 
+import org.opends.server.util.TimeThread;
+import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
 import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
@@ -162,6 +167,7 @@
 import org.opends.server.util.LDIFReader;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 import org.opends.server.workflowelement.localbackend.*;
+import org.opends.server.tasks.PurgeConflictsHistoricalTask;
 
 /**
  *  This class implements the bulk part of the.of the Directory Server side
@@ -239,7 +245,7 @@
    */
   public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
 
-  /**
+ /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
@@ -408,6 +414,18 @@
   // ChangeNumbers.
   private boolean logChangeNumber = false;
 
+  // This configuration integer indicates the time the domain keeps the
+  // historical information necessary to solve conflicts.
+  // When a change stored in the historical part of the user entry has a date
+  // (from its replication ChangeNumber) older than this delay, it is candidate
+  // to be purged.
+  private long histPurgeDelayInMilliSec = 0;
+
+  // The last change number purged in this domain. Allows to have a continuous
+  // purging process from one purge processing (task run) to the next one.
+  // Values 0 when the server starts.
+  private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
+
   /**
    * The thread that periodically saves the ServerState of this
    * LDAPReplicationDomain in the database.
@@ -547,14 +565,17 @@
 
     // Read the configuration parameters.
     Set<String> replicationServers = configuration.getReplicationServer();
-    serverId = configuration.getServerId();
-    baseDn = configuration.getBaseDN();
+
+    this.serverId = configuration.getServerId();
+    this.baseDn = configuration.getBaseDN();
     int window  = configuration.getWindowSize();
     heartbeatInterval = configuration.getHeartbeatInterval();
-    isolationpolicy = configuration.getIsolationPolicy();
-    configDn = configuration.dn();
-    logChangeNumber = configuration.isLogChangenumber();
+    this.isolationpolicy = configuration.getIsolationPolicy();
+    this.configDn = configuration.dn();
+    this.logChangeNumber = configuration.isLogChangenumber();
     this.updateToReplayQueue = updateToReplayQueue;
+    this.histPurgeDelayInMilliSec =
+      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
 
     // Get assured configuration
     readAssuredConfig(configuration, false);
@@ -2423,6 +2444,12 @@
    * Check if the operation that just happened has cleared a conflict :
    * Clearing a conflict happens if the operation has free a DN that
    * for which an other entry was in conflict.
+   * Steps:
+   * - get the DN freed by a DELETE or MODRDN op
+   * - search for entries put in the conflict space (dn=entryuid'+'....)
+   *   because the expected DN was not available (ds-sync-conflict=expected DN)
+   * - retain the entry with the oldest conflict
+   * - rename this entry with the freedDN as it was expected originally
    */
    private void checkForClearedConflict(PostOperationOperation op)
    {
@@ -2433,14 +2460,14 @@
        return;
      }
 
-     DN targetDN;
+     DN freedDN;
      if (type == OperationType.DELETE)
      {
-       targetDN = ((PostOperationDeleteOperation) op).getEntryDN();
+       freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
      }
      else if (type == OperationType.MODIFY_DN)
      {
-       targetDN = ((PostOperationModifyDNOperation) op).getEntryDN();
+       freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
      }
      else
      {
@@ -2451,7 +2478,7 @@
     try
     {
       filter = LDAPFilter.decode(
-          DS_SYNC_CONFLICT + "=" + targetDN.toNormalizedString());
+          DS_SYNC_CONFLICT + "=" + freedDN.toNormalizedString());
     } catch (LDAPException e)
     {
       // Not possible. We know the filter just above is correct.
@@ -2491,7 +2518,7 @@
      {
        DN entryDN = entrytoRename.getDN();
        ModifyDNOperationBasis newOp = renameEntry(
-           entryDN, targetDN.getRDN(), targetDN.getParent(), false);
+           entryDN, freedDN.getRDN(), freedDN.getParent(), false);
 
        ResultCode res = newOp.getResultCode();
        if (res != ResultCode.SUCCESS)
@@ -4410,6 +4437,8 @@
   {
     isolationpolicy = configuration.getIsolationPolicy();
     logChangeNumber = configuration.isLogChangenumber();
+    histPurgeDelayInMilliSec =
+      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
 
     changeConfig(
         configuration.getReplicationServer(),
@@ -5588,4 +5617,122 @@
   {
     return this.eclDomain.isEnabled();
   }
+
+  /**
+   * Return the purge delay (in ms) for the historical information stored
+   * in entries to solve conflicts for this domain.
+   *
+   * @return the purge delay.
+   */
+  public long getHistoricalPurgeDelay()
+  {
+    return histPurgeDelayInMilliSec;
+  }
+
+  /**
+   * Check if the operation that just happened has cleared a conflict :
+   * Clearing a conflict happens if the operation has free a DN that
+   * for which an other entry was in conflict.
+   * Steps:
+   * - get the DN freed by a DELETE or MODRDN op
+   * - search for entries put in the conflict space (dn=entryuid'+'....)
+   *   because the expected DN was not available (ds-sync-conflict=expected DN)
+   * - retain the entry with the oldest conflict
+   * - rename this entry with the freedDN as it was expected originally
+   *
+   * @param task     the task raising this purge.
+   * @param endDate  the date to stop this task whether the job is done or not.
+   * @throws DirectoryException when an exception happens.
+   *
+   */
+   public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
+       long endDate)
+   throws DirectoryException
+   {
+     LDAPFilter filter = null;
+
+     TRACER.debugInfo("[PURGE] purgeConflictsHistorical "
+         + "on domain: " + baseDn
+         + "endDate:" + new Date(endDate)
+         + "lastChangeNumberPurgedFromHist: "
+         + lastChangeNumberPurgedFromHist.toStringUI());
+
+     try
+     {
+       filter = LDAPFilter.decode(
+         "(" + EntryHistorical.HISTORICALATTRIBUTENAME + ">=dummy:"
+         + lastChangeNumberPurgedFromHist + ")");
+
+     } catch (LDAPException e)
+     {
+       // Not possible. We know the filter just above is correct.
+     }
+
+     LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+     attrs.add(EntryHistorical.HISTORICALATTRIBUTENAME);
+     attrs.add(EntryHistorical.ENTRYUIDNAME);
+     attrs.add("*");
+     InternalSearchOperation searchOp =  conn.processSearch(
+         ByteString.valueOf(baseDn.toString()),
+         SearchScope.WHOLE_SUBTREE,
+         DereferencePolicy.NEVER_DEREF_ALIASES,
+         0, 0, false, filter,
+         attrs, null);
+
+     int count = 0;
+     task.setProgressStats(lastChangeNumberPurgedFromHist, count);
+
+     LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
+     for (SearchResultEntry entry : entries)
+     {
+       long maxTimeToRun = endDate - TimeThread.getTime();
+       if (maxTimeToRun<0)
+       {
+         Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
+             " end date reached");
+         DirectoryException de = new DirectoryException(
+             ResultCode.ADMIN_LIMIT_EXCEEDED,
+             errMsg);
+         throw (de);
+       }
+
+       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
+       lastChangeNumberPurgedFromHist = entryHist.getOldestCN();
+       entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
+       Attribute attr = entryHist.encodeAndPurge();
+       count += entryHist.getLastPurgedValuesCount();
+       List<Modification> mods = new LinkedList<Modification>();
+       Modification mod;
+       mod = new Modification(ModificationType.REPLACE, attr);
+       mods.add(mod);
+
+       ModifyOperationBasis newOp =
+         new ModifyOperationBasis(
+             conn, InternalClientConnection.nextOperationID(),
+             InternalClientConnection.nextMessageID(),
+             new ArrayList<Control>(0),
+             entry.getDN(),
+             mods);
+       newOp.setInternalOperation(true);
+       newOp.setSynchronizationOperation(true);
+       newOp.setDontSynchronize(true);
+
+       newOp.run();
+
+       if (newOp.getResultCode() != ResultCode.SUCCESS)
+       {
+         // Log information for the repair tool.
+         MessageBuilder mb = new MessageBuilder();
+         mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get());
+         mb.append(String.valueOf(newOp));
+         mb.append(" ");
+         mb.append(String.valueOf(newOp.getResultCode()));
+         logError(mb.toMessage());
+       }
+       else
+       {
+         task.setProgressStats(lastChangeNumberPurgedFromHist, count);
+       }
+     }
+   }
 }

--
Gitblit v1.10.0