mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.07.2014 4f4948ee52a68b9efe6bb4584bc95f9f25b0fa5c
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,19 +27,17 @@
import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -835,6 +833,7 @@
   */
  private final class ChangelogDBPurger extends DirectoryThread
  {
    private static final int DEFAULT_SLEEP = 500;
    protected ChangelogDBPurger()
    {
@@ -851,42 +850,58 @@
      {
        try
        {
          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
          if (localCNIndexDB == null)
          { // shutdown has been called
            return;
          }
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final MultiDomainServerState purgeUpToCookie =
              localCNIndexDB.purgeUpTo(purgeTimestamp);
          if (purgeUpToCookie == null)
          { // this can happen when the change number index DB is empty
            continue;
          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
          final CSN oldestNotPurgedCSN;
          // next code assumes that the compute-change-number config
          // never changes during the life time of an RS
          if (!config.isComputeChangeNumber())
          {
            oldestNotPurgedCSN = purgeCSN;
          }
          else
          {
            final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
            if (localCNIndexDB == null)
            { // shutdown has been initiated
              return;
            }
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              if (!isShutdownInitiated())
              {
                // ... or the change number index DB is empty,
                // wait for new changes to come in.
                // Note we cannot sleep for as long as the purge delay
                // (3 days default), because we might receive late updates
                // that will have to be purged before the purge delay elapses.
                // This can particularly happen in case of network partitions.
                sleep(DEFAULT_SLEEP);
              }
              continue;
            }
          }
          /*
           * Drive purge of the replica DBs by the oldest non purged cookie in
           * the change number index DB.
           */
          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
              : domainToReplicaDBs.entrySet())
          for (final Map<Integer, JEReplicaDB> domainMap
              : domainToReplicaDBs.values())
          {
            final DN baseDN = entry1.getKey();
            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
            for (final JEReplicaDB replicaDB : domainMap.values())
            {
              final Integer serverId = entry2.getKey();
              final JEReplicaDB replicaDB = entry2.getValue();
              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
              replicaDB.purgeUpTo(oldestNotPurgedCSN);
            }
          }
          latestPurgeDate = purgeTimestamp;
          // purge delay is specified in seconds so it should not be a problem
          // to sleep for 500 millis
          sleep(500);
          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
          // shutdown initiated?
        }
        catch (Exception e)
        {
@@ -898,5 +913,18 @@
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
      if (currentPurgeTime <= nextPurgeTime)
      {
        // sleep till the next CSN to purge,
        return nextPurgeTime - currentPurgeTime;
      }
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
  }
}