From 891159050af4aa3fe47c67e3ba7d3f21299027a4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 02 Dec 2013 14:01:32 +0000
Subject: [PATCH] OPENDJ-1174 (CR-2631) Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 84 ++++++++++++++++++++++++++++++++++++-----
1 files changed, 73 insertions(+), 11 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 2e00f89..aa986ff 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -31,9 +31,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
@@ -79,7 +81,7 @@
domainToReplicaDBs =
new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
private ReplicationDbEnv dbEnv;
- private final String dbDirectoryName;
+ private ReplicationServerCfg config;
private final File dbDirectory;
/**
@@ -89,6 +91,8 @@
* Guarded by cnIndexDBLock
*/
private JEChangeNumberIndexDB cnIndexDB;
+ private final AtomicReference<ChangeNumberIndexer> cnIndexer =
+ new AtomicReference<ChangeNumberIndexer>();
/** Used for protecting {@link ChangeNumberIndexDB} related state. */
private final Object cnIndexDBLock = new Object();
@@ -131,17 +135,17 @@
*
* @param replicationServer
* the local replication server.
- * @param dbDirName
- * the directory for use by the replication database
+ * @param config
+ * the replication server configuration
* @throws ConfigException
* if a problem occurs opening the supplied directory
*/
- public JEChangelogDB(ReplicationServer replicationServer, String dbDirName)
- throws ConfigException
+ public JEChangelogDB(ReplicationServer replicationServer,
+ ReplicationServerCfg config) throws ConfigException
{
+ this.config = config;
this.replicationServer = replicationServer;
- this.dbDirectoryName = dbDirName != null ? dbDirName : "changelogDb";
- this.dbDirectory = makeDir(this.dbDirectoryName);
+ this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
private File makeDir(String dbDirName) throws ConfigException
@@ -303,9 +307,19 @@
{
try
{
- dbEnv = new ReplicationDbEnv(
- getFileForPath(dbDirectoryName).getAbsolutePath(), replicationServer);
- initializeChangelogState(dbEnv.readChangelogState());
+ final File dbDir = getFileForPath(config.getReplicationDBDirectory());
+ dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+ final ChangelogState changelogState = dbEnv.readChangelogState();
+ initializeChangelogState(changelogState);
+ if (config.isComputeChangenumber())
+ {
+ final ChangeNumberIndexer indexer =
+ new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
+ }
}
catch (ChangelogException e)
{
@@ -361,6 +375,12 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ cnIndexer.compareAndSet(indexer, null);
+ }
try
{
shutdownCNIndexDB();
@@ -411,6 +431,12 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.clear();
+ }
+
for (DN baseDN : this.domainToReplicaDBs.keySet())
{
removeDomain(baseDN);
@@ -617,6 +643,11 @@
@Override
public void setPurgeDelay(long delay)
{
+ final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
+ if (cnIndexDB != null)
+ {
+ cnIndexDB.setPurgeDelay(delay);
+ }
for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
{
for (JEReplicaDB replicaDB : domainMap.values())
@@ -628,6 +659,31 @@
/** {@inheritDoc} */
@Override
+ public void setComputeChangeNumber(boolean computeChangeNumber)
+ throws ChangelogException
+ {
+ final ChangeNumberIndexer indexer;
+ if (computeChangeNumber)
+ {
+ final ChangelogState changelogState = dbEnv.readChangelogState();
+ indexer = new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
+ }
+ else
+ {
+ indexer = cnIndexer.getAndSet(null);
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
public long getDomainLatestTrimDate(DN baseDN)
{
long latest = 0;
@@ -693,7 +749,8 @@
for (int serverId : serverIds)
{
// get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState.getCSN(serverId);
+ final CSN lastCSN = startAfterServerState != null ?
+ startAfterServerState.getCSN(serverId) : null;
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
}
return new CompositeDBCursor<Void>(cursors);
@@ -751,6 +808,11 @@
final boolean wasCreated = pair.getSecond();
replicaDB.add(updateMsg);
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.publishUpdateMsg(baseDN, updateMsg);
+ }
return wasCreated;
}
--
Gitblit v1.10.0