mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.30.2014 aacb8bbf0a764ce8eb205e0f6376c055b3e1baa8
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@
import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +39,6 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -64,7 +62,7 @@
public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
{
  /** The tracer object for the debug logger. */
  protected static final DebugTracer TRACER = getTracer();
  private static final DebugTracer TRACER = getTracer();
  /**
   * This map contains the List of updates received from each LDAP server.
@@ -846,6 +844,7 @@
   */
  private final class ChangelogDBPurger extends DirectoryThread
  {
    private static final int DEFAULT_SLEEP = 500;
    protected ChangelogDBPurger()
    {
@@ -862,42 +861,58 @@
      {
        try
        {
          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
          if (localCNIndexDB == null)
          { // shutdown has been called
            return;
          }
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final MultiDomainServerState purgeUpToCookie =
              localCNIndexDB.purgeUpTo(purgeTimestamp);
          if (purgeUpToCookie == null)
          { // this can happen when the change number index DB is empty
            continue;
          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
          final CSN oldestNotPurgedCSN;
          // next code assumes that the compute-change-number config
          // never changes during the life time of an RS
          if (!config.isComputeChangeNumber())
          {
            oldestNotPurgedCSN = purgeCSN;
          }
          else
          {
            final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
            if (localCNIndexDB == null)
            { // shutdown has been initiated
              return;
            }
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              if (!isShutdownInitiated())
              {
                // ... or the change number index DB is empty,
                // wait for new changes to come in.
                // Note we cannot sleep for as long as the purge delay
                // (3 days default), because we might receive late updates
                // that will have to be purged before the purge delay elapses.
                // This can particularly happen in case of network partitions.
                sleep(DEFAULT_SLEEP);
              }
              continue;
            }
          }
          /*
           * Drive purge of the replica DBs by the oldest non purged cookie in
           * the change number index DB.
           */
          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
              : domainToReplicaDBs.entrySet())
          for (final Map<Integer, JEReplicaDB> domainMap
              : domainToReplicaDBs.values())
          {
            final DN baseDN = entry1.getKey();
            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
            for (final JEReplicaDB replicaDB : domainMap.values())
            {
              final Integer serverId = entry2.getKey();
              final JEReplicaDB replicaDB = entry2.getValue();
              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
              replicaDB.purgeUpTo(oldestNotPurgedCSN);
            }
          }
          latestPurgeDate = purgeTimestamp;
          // purge delay is specified in seconds so it should not be a problem
          // to sleep for 500 millis
          sleep(500);
          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
          // shutdown initiated?
        }
        catch (Exception e)
        {
@@ -910,5 +925,18 @@
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
      if (currentPurgeTime <= nextPurgeTime)
      {
        // sleep till the next CSN to purge,
        return nextPurgeTime - currentPurgeTime;
      }
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
  }
}