mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
18.27.2010 b9aad30c9e07b179a2c22fad830f6a54b8993bc9
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -24,6 +24,7 @@
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -62,9 +63,13 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import java.util.Date;
import org.opends.server.util.TimeThread;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
@@ -162,6 +167,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.*;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
/**
 *  This class implements the bulk part of the.of the Directory Server side
@@ -239,7 +245,7 @@
   */
  public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
  /**
 /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
@@ -408,6 +414,18 @@
  // ChangeNumbers.
  private boolean logChangeNumber = false;
  // This configuration integer indicates the time the domain keeps the
  // historical information necessary to solve conflicts.
  // When a change stored in the historical part of the user entry has a date
  // (from its replication ChangeNumber) older than this delay, it is candidate
  // to be purged.
  private long histPurgeDelayInMilliSec = 0;
  // The last change number purged in this domain. Allows to have a continuous
  // purging process from one purge processing (task run) to the next one.
  // Values 0 when the server starts.
  private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
  /**
   * The thread that periodically saves the ServerState of this
   * LDAPReplicationDomain in the database.
@@ -547,14 +565,17 @@
    // Read the configuration parameters.
    Set<String> replicationServers = configuration.getReplicationServer();
    serverId = configuration.getServerId();
    baseDn = configuration.getBaseDN();
    this.serverId = configuration.getServerId();
    this.baseDn = configuration.getBaseDN();
    int window  = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    isolationpolicy = configuration.getIsolationPolicy();
    configDn = configuration.dn();
    logChangeNumber = configuration.isLogChangenumber();
    this.isolationpolicy = configuration.getIsolationPolicy();
    this.configDn = configuration.dn();
    this.logChangeNumber = configuration.isLogChangenumber();
    this.updateToReplayQueue = updateToReplayQueue;
    this.histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
    // Get assured configuration
    readAssuredConfig(configuration, false);
@@ -2423,6 +2444,12 @@
   * Check if the operation that just happened has cleared a conflict :
   * Clearing a conflict happens if the operation has free a DN that
   * for which an other entry was in conflict.
   * Steps:
   * - get the DN freed by a DELETE or MODRDN op
   * - search for entries put in the conflict space (dn=entryuid'+'....)
   *   because the expected DN was not available (ds-sync-conflict=expected DN)
   * - retain the entry with the oldest conflict
   * - rename this entry with the freedDN as it was expected originally
   */
   private void checkForClearedConflict(PostOperationOperation op)
   {
@@ -2433,14 +2460,14 @@
       return;
     }
     DN targetDN;
     DN freedDN;
     if (type == OperationType.DELETE)
     {
       targetDN = ((PostOperationDeleteOperation) op).getEntryDN();
       freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
     }
     else if (type == OperationType.MODIFY_DN)
     {
       targetDN = ((PostOperationModifyDNOperation) op).getEntryDN();
       freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
     }
     else
     {
@@ -2451,7 +2478,7 @@
    try
    {
      filter = LDAPFilter.decode(
          DS_SYNC_CONFLICT + "=" + targetDN.toNormalizedString());
          DS_SYNC_CONFLICT + "=" + freedDN.toNormalizedString());
    } catch (LDAPException e)
    {
      // Not possible. We know the filter just above is correct.
@@ -2491,7 +2518,7 @@
     {
       DN entryDN = entrytoRename.getDN();
       ModifyDNOperationBasis newOp = renameEntry(
           entryDN, targetDN.getRDN(), targetDN.getParent(), false);
           entryDN, freedDN.getRDN(), freedDN.getParent(), false);
       ResultCode res = newOp.getResultCode();
       if (res != ResultCode.SUCCESS)
@@ -4410,6 +4437,8 @@
  {
    isolationpolicy = configuration.getIsolationPolicy();
    logChangeNumber = configuration.isLogChangenumber();
    histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
    changeConfig(
        configuration.getReplicationServer(),
@@ -5588,4 +5617,122 @@
  {
    return this.eclDomain.isEnabled();
  }
  /**
   * Return the purge delay (in ms) for the historical information stored
   * in entries to solve conflicts for this domain.
   *
   * @return the purge delay.
   */
  public long getHistoricalPurgeDelay()
  {
    return histPurgeDelayInMilliSec;
  }
  /**
   * Check if the operation that just happened has cleared a conflict :
   * Clearing a conflict happens if the operation has free a DN that
   * for which an other entry was in conflict.
   * Steps:
   * - get the DN freed by a DELETE or MODRDN op
   * - search for entries put in the conflict space (dn=entryuid'+'....)
   *   because the expected DN was not available (ds-sync-conflict=expected DN)
   * - retain the entry with the oldest conflict
   * - rename this entry with the freedDN as it was expected originally
   *
   * @param task     the task raising this purge.
   * @param endDate  the date to stop this task whether the job is done or not.
   * @throws DirectoryException when an exception happens.
   *
   */
   public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
       long endDate)
   throws DirectoryException
   {
     LDAPFilter filter = null;
     TRACER.debugInfo("[PURGE] purgeConflictsHistorical "
         + "on domain: " + baseDn
         + "endDate:" + new Date(endDate)
         + "lastChangeNumberPurgedFromHist: "
         + lastChangeNumberPurgedFromHist.toStringUI());
     try
     {
       filter = LDAPFilter.decode(
         "(" + EntryHistorical.HISTORICALATTRIBUTENAME + ">=dummy:"
         + lastChangeNumberPurgedFromHist + ")");
     } catch (LDAPException e)
     {
       // Not possible. We know the filter just above is correct.
     }
     LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
     attrs.add(EntryHistorical.HISTORICALATTRIBUTENAME);
     attrs.add(EntryHistorical.ENTRYUIDNAME);
     attrs.add("*");
     InternalSearchOperation searchOp =  conn.processSearch(
         ByteString.valueOf(baseDn.toString()),
         SearchScope.WHOLE_SUBTREE,
         DereferencePolicy.NEVER_DEREF_ALIASES,
         0, 0, false, filter,
         attrs, null);
     int count = 0;
     task.setProgressStats(lastChangeNumberPurgedFromHist, count);
     LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
     for (SearchResultEntry entry : entries)
     {
       long maxTimeToRun = endDate - TimeThread.getTime();
       if (maxTimeToRun<0)
       {
         Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
             " end date reached");
         DirectoryException de = new DirectoryException(
             ResultCode.ADMIN_LIMIT_EXCEEDED,
             errMsg);
         throw (de);
       }
       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
       lastChangeNumberPurgedFromHist = entryHist.getOldestCN();
       entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
       Attribute attr = entryHist.encodeAndPurge();
       count += entryHist.getLastPurgedValuesCount();
       List<Modification> mods = new LinkedList<Modification>();
       Modification mod;
       mod = new Modification(ModificationType.REPLACE, attr);
       mods.add(mod);
       ModifyOperationBasis newOp =
         new ModifyOperationBasis(
             conn, InternalClientConnection.nextOperationID(),
             InternalClientConnection.nextMessageID(),
             new ArrayList<Control>(0),
             entry.getDN(),
             mods);
       newOp.setInternalOperation(true);
       newOp.setSynchronizationOperation(true);
       newOp.setDontSynchronize(true);
       newOp.run();
       if (newOp.getResultCode() != ResultCode.SUCCESS)
       {
         // Log information for the repair tool.
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get());
         mb.append(String.valueOf(newOp));
         mb.append(" ");
         mb.append(String.valueOf(newOp.getResultCode()));
         logError(mb.toMessage());
       }
       else
       {
         task.setProgressStats(lastChangeNumberPurgedFromHist, count);
       }
     }
   }
}