mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.01.2013 891159050af4aa3fe47c67e3ba7d3f21299027a4
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -31,9 +31,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
@@ -79,7 +81,7 @@
      domainToReplicaDBs =
          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private ReplicationServerCfg config;
  private final File dbDirectory;
  /**
@@ -89,6 +91,8 @@
   * Guarded by cnIndexDBLock
   */
  private JEChangeNumberIndexDB cnIndexDB;
  private final AtomicReference<ChangeNumberIndexer> cnIndexer =
      new AtomicReference<ChangeNumberIndexer>();
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
@@ -131,17 +135,17 @@
   *
   * @param replicationServer
   *          the local replication server.
   * @param dbDirName
   *          the directory for use by the replication database
   * @param config
   *          the replication server configuration
   * @throws ConfigException
   *           if a problem occurs opening the supplied directory
   */
  public JEChangelogDB(ReplicationServer replicationServer, String dbDirName)
      throws ConfigException
  public JEChangelogDB(ReplicationServer replicationServer,
      ReplicationServerCfg config) throws ConfigException
  {
    this.config = config;
    this.replicationServer = replicationServer;
    this.dbDirectoryName = dbDirName != null ? dbDirName : "changelogDb";
    this.dbDirectory = makeDir(this.dbDirectoryName);
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
  private File makeDir(String dbDirName) throws ConfigException
@@ -303,9 +307,19 @@
  {
    try
    {
      dbEnv = new ReplicationDbEnv(
          getFileForPath(dbDirectoryName).getAbsolutePath(), replicationServer);
      initializeChangelogState(dbEnv.readChangelogState());
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.readChangelogState();
      initializeChangelogState(changelogState);
      if (config.isComputeChangenumber())
      {
        final ChangeNumberIndexer indexer =
            new ChangeNumberIndexer(this, changelogState);
        if (cnIndexer.compareAndSet(null, indexer))
        {
          indexer.start();
        }
      }
    }
    catch (ChangelogException e)
    {
@@ -361,6 +375,12 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.initiateShutdown();
      cnIndexer.compareAndSet(indexer, null);
    }
    try
    {
      shutdownCNIndexDB();
@@ -411,6 +431,12 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.clear();
    }
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
@@ -617,6 +643,11 @@
  @Override
  public void setPurgeDelay(long delay)
  {
    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
    if (cnIndexDB != null)
    {
      cnIndexDB.setPurgeDelay(delay);
    }
    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
    {
      for (JEReplicaDB replicaDB : domainMap.values())
@@ -628,6 +659,31 @@
  /** {@inheritDoc} */
  @Override
  public void setComputeChangeNumber(boolean computeChangeNumber)
      throws ChangelogException
  {
    final ChangeNumberIndexer indexer;
    if (computeChangeNumber)
    {
      final ChangelogState changelogState = dbEnv.readChangelogState();
      indexer = new ChangeNumberIndexer(this, changelogState);
      if (cnIndexer.compareAndSet(null, indexer))
      {
        indexer.start();
      }
    }
    else
    {
      indexer = cnIndexer.getAndSet(null);
      if (indexer != null)
      {
        indexer.initiateShutdown();
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
  {
    long latest = 0;
@@ -693,7 +749,8 @@
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState.getCSN(serverId);
      final CSN lastCSN = startAfterServerState != null ?
          startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    return new CompositeDBCursor<Void>(cursors);
@@ -751,6 +808,11 @@
    final boolean wasCreated = pair.getSecond();
    replicaDB.add(updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
  }