From 891159050af4aa3fe47c67e3ba7d3f21299027a4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 02 Dec 2013 14:01:32 +0000
Subject: [PATCH] OPENDJ-1174 (CR-2631) Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |   84 ++++++++++++++++++++++++++++++++++++-----
 1 files changed, 73 insertions(+), 11 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 2e00f89..aa986ff 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -31,9 +31,11 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
@@ -79,7 +81,7 @@
       domainToReplicaDBs =
           new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
   private ReplicationDbEnv dbEnv;
-  private final String dbDirectoryName;
+  private ReplicationServerCfg config;
   private final File dbDirectory;
 
   /**
@@ -89,6 +91,8 @@
    * Guarded by cnIndexDBLock
    */
   private JEChangeNumberIndexDB cnIndexDB;
+  private final AtomicReference<ChangeNumberIndexer> cnIndexer =
+      new AtomicReference<ChangeNumberIndexer>();
 
   /** Used for protecting {@link ChangeNumberIndexDB} related state. */
   private final Object cnIndexDBLock = new Object();
@@ -131,17 +135,17 @@
    *
    * @param replicationServer
    *          the local replication server.
-   * @param dbDirName
-   *          the directory for use by the replication database
+   * @param config
+   *          the replication server configuration
    * @throws ConfigException
    *           if a problem occurs opening the supplied directory
    */
-  public JEChangelogDB(ReplicationServer replicationServer, String dbDirName)
-      throws ConfigException
+  public JEChangelogDB(ReplicationServer replicationServer,
+      ReplicationServerCfg config) throws ConfigException
   {
+    this.config = config;
     this.replicationServer = replicationServer;
-    this.dbDirectoryName = dbDirName != null ? dbDirName : "changelogDb";
-    this.dbDirectory = makeDir(this.dbDirectoryName);
+    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
   private File makeDir(String dbDirName) throws ConfigException
@@ -303,9 +307,19 @@
   {
     try
     {
-      dbEnv = new ReplicationDbEnv(
-          getFileForPath(dbDirectoryName).getAbsolutePath(), replicationServer);
-      initializeChangelogState(dbEnv.readChangelogState());
+      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
+      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+      final ChangelogState changelogState = dbEnv.readChangelogState();
+      initializeChangelogState(changelogState);
+      if (config.isComputeChangenumber())
+      {
+        final ChangeNumberIndexer indexer =
+            new ChangeNumberIndexer(this, changelogState);
+        if (cnIndexer.compareAndSet(null, indexer))
+        {
+          indexer.start();
+        }
+      }
     }
     catch (ChangelogException e)
     {
@@ -361,6 +375,12 @@
     // - then throw the first encountered exception
     ChangelogException firstException = null;
 
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      indexer.initiateShutdown();
+      cnIndexer.compareAndSet(indexer, null);
+    }
     try
     {
       shutdownCNIndexDB();
@@ -411,6 +431,12 @@
     // - then throw the first encountered exception
     ChangelogException firstException = null;
 
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      indexer.clear();
+    }
+
     for (DN baseDN : this.domainToReplicaDBs.keySet())
     {
       removeDomain(baseDN);
@@ -617,6 +643,11 @@
   @Override
   public void setPurgeDelay(long delay)
   {
+    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
+    if (cnIndexDB != null)
+    {
+      cnIndexDB.setPurgeDelay(delay);
+    }
     for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
     {
       for (JEReplicaDB replicaDB : domainMap.values())
@@ -628,6 +659,31 @@
 
   /** {@inheritDoc} */
   @Override
+  public void setComputeChangeNumber(boolean computeChangeNumber)
+      throws ChangelogException
+  {
+    final ChangeNumberIndexer indexer;
+    if (computeChangeNumber)
+    {
+      final ChangelogState changelogState = dbEnv.readChangelogState();
+      indexer = new ChangeNumberIndexer(this, changelogState);
+      if (cnIndexer.compareAndSet(null, indexer))
+      {
+        indexer.start();
+      }
+    }
+    else
+    {
+      indexer = cnIndexer.getAndSet(null);
+      if (indexer != null)
+      {
+        indexer.initiateShutdown();
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public long getDomainLatestTrimDate(DN baseDN)
   {
     long latest = 0;
@@ -693,7 +749,8 @@
     for (int serverId : serverIds)
     {
       // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState.getCSN(serverId);
+      final CSN lastCSN = startAfterServerState != null ?
+          startAfterServerState.getCSN(serverId) : null;
       cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
     }
     return new CompositeDBCursor<Void>(cursors);
@@ -751,6 +808,11 @@
     final boolean wasCreated = pair.getSecond();
 
     replicaDB.add(updateMsg);
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      indexer.publishUpdateMsg(baseDN, updateMsg);
+    }
     return wasCreated;
   }
 

--
Gitblit v1.10.0