From 5628ce31b6a1cc9b0938230ad96d0caa6725c2c2 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 24 Jun 2014 15:00:24 +0000
Subject: [PATCH] FileChangelogDB.java, JEChangelogDB.java: Synchronized the two files for easier code identification. Code is really too much duplicated between the two implementations.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 84 ++++++++++++---------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 62 ++++++++++++++++----
2 files changed, 85 insertions(+), 61 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 6013bb6..b83a4f1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -106,7 +106,6 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
-
private final AtomicBoolean shutdown = new AtomicBoolean();
static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
@@ -322,13 +321,11 @@
if (indexer != null)
{
indexer.initiateShutdown();
- indexer.interrupt();
}
final ChangelogDBPurger purger = cnPurger.getAndSet(null);
if (purger != null)
{
purger.initiateShutdown();
- purger.interrupt();
}
try
@@ -641,6 +638,8 @@
final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
}
+ // recycle exhausted cursors,
+ // because client code will not manage the cursors itself
return new CompositeDBCursor<Void>(cursors, true);
}
@@ -762,17 +761,14 @@
oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
if (oldestNotPurgedCSN == null)
{ // shutdown may have been initiated...
- if (!isShutdownInitiated())
- {
- // ... or the change number index DB is empty,
- // wait for new changes to come in.
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
- // Note we cannot sleep for as long as the purge delay
- // (3 days default), because we might receive late updates
- // that will have to be purged before the purge delay elapses.
- // This can particularly happen in case of network partitions.
- sleep(DEFAULT_SLEEP);
- }
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ jeFriendlySleep(DEFAULT_SLEEP);
continue;
}
}
@@ -787,7 +783,7 @@
latestPurgeDate = purgeTimestamp;
- sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
}
catch (InterruptedException e)
{
@@ -804,6 +800,33 @@
}
}
+ /**
+ * This method implements a sleep() that is friendly to Berkeley JE.
+ * <p>
+ * Originally, {@link Thread#sleep(long)} was used , but waking up a
+ * sleeping threads required calling {@link Thread#interrupt()}, and JE
+ * threw exceptions when invoked on interrupted threads.
+ * <p>
+ * The solution is to replace:
+ * <ol>
+ * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
+ * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
+ * </ol>
+ */
+ private void jeFriendlySleep(long millis) throws InterruptedException
+ {
+ if (!isShutdownInitiated())
+ {
+ synchronized (this)
+ {
+ if (!isShutdownInitiated())
+ {
+ wait(millis);
+ }
+ }
+ }
+ }
+
private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
{
final long nextPurgeTime = notPurgedCSN.getTime();
@@ -816,5 +839,16 @@
// wait a bit before purging more
return DEFAULT_SLEEP;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public void initiateShutdown()
+ {
+ super.initiateShutdown();
+ synchronized (this)
+ {
+ notify(); // wake up the purger thread for faster shutdown
+ }
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fb9cad9..2a9d974 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -79,8 +79,7 @@
* concurrent shutdown.
*/
private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
- domainToReplicaDBs =
- new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+ domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
private ReplicationDbEnv dbEnv;
private ReplicationServerCfg config;
private final File dbDirectory;
@@ -92,8 +91,7 @@
* @GuardedBy("cnIndexDBLock")
*/
private JEChangeNumberIndexDB cnIndexDB;
- private final AtomicReference<ChangeNumberIndexer> cnIndexer =
- new AtomicReference<ChangeNumberIndexer>();
+ private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>();
/** Used for protecting {@link ChangeNumberIndexDB} related state. */
private final Object cnIndexDBLock = new Object();
@@ -103,8 +101,7 @@
* older than this delay might be removed.
*/
private long purgeDelayInMillis;
- private final AtomicReference<ChangelogDBPurger> cnPurger =
- new AtomicReference<ChangelogDBPurger>();
+ private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
private volatile long latestPurgeDate;
/** The local replication server. */
@@ -161,7 +158,7 @@
private File makeDir(String dbDirName) throws ConfigException
{
// Check that this path exists or create it.
- File dbDirectory = getFileForPath(dbDirName);
+ final File dbDirectory = getFileForPath(dbDirName);
try
{
if (!dbDirectory.exists())
@@ -223,7 +220,7 @@
* the baseDN for which to create a ReplicaDB
* @param serverId
* the serverId for which to create a ReplicaDB
- * @param rs
+ * @param server
* the ReplicationServer
* @return a Pair with the JEReplicaDB and a boolean indicating whether it had
* to be created
@@ -231,14 +228,14 @@
* if a problem occurred with the database
*/
Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
- int serverId, ReplicationServer rs) throws ChangelogException
+ int serverId, ReplicationServer server) throws ChangelogException
{
while (!shutdown.get())
{
final ConcurrentMap<Integer, JEReplicaDB> domainMap =
getExistingOrNewDomainMap(baseDN);
final Pair<JEReplicaDB, Boolean> result =
- getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
+ getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
if (result != null)
{
return result;
@@ -276,7 +273,7 @@
private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
- DN baseDN, ReplicationServer rs) throws ChangelogException
+ DN baseDN, ReplicationServer server) throws ChangelogException
{
// happy path: the JEReplicaDB already exists
JEReplicaDB currentValue = domainMap.get(serverId);
@@ -298,16 +295,16 @@
if (domainToReplicaDBs.get(baseDN) != domainMap)
{
- // the domainMap could have been concurrently removed because
+ // The domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
// 1) shutdown properly or 2) lazily recreate the JEReplicaDB
return null;
}
- final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
- domainMap.put(serverId, newValue);
- return Pair.of(newValue, true);
+ final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+ domainMap.put(serverId, newDB);
+ return Pair.of(newDB, true);
}
}
@@ -320,7 +317,7 @@
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
final ChangelogState changelogState = dbEnv.readChangelogState();
- initializeChangelogState(changelogState);
+ initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
startIndexer(changelogState);
@@ -330,24 +327,21 @@
catch (ChangelogException e)
{
if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
-
- logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(),
- e.getLocalizedMessage()));
+ }
+ logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
}
}
- private void initializeChangelogState(final ChangelogState changelogState)
+ private void initializeToChangelogState(final ChangelogState changelogState)
throws ChangelogException
{
- for (Map.Entry<DN, Long> entry :
- changelogState.getDomainToGenerationId().entrySet())
+ for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
{
- replicationServer.getReplicationServerDomain(entry.getKey(), true)
- .initGenerationID(entry.getValue());
+ replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
}
- for (Map.Entry<DN, List<Integer>> entry :
- changelogState.getDomainToServerIds().entrySet())
+ for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
for (int serverId : entry.getValue())
{
@@ -491,7 +485,9 @@
firstException = e;
}
else if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
cnIndexDB = null;
@@ -618,7 +614,7 @@
/** {@inheritDoc} */
@Override
- public void setPurgeDelay(long purgeDelayInMillis)
+ public void setPurgeDelay(final long purgeDelayInMillis)
{
this.purgeDelayInMillis = purgeDelayInMillis;
final ChangelogDBPurger purger;
@@ -642,7 +638,7 @@
/** {@inheritDoc} */
@Override
- public void setComputeChangeNumber(boolean computeChangeNumber)
+ public void setComputeChangeNumber(final boolean computeChangeNumber)
throws ChangelogException
{
if (computeChangeNumber)
@@ -661,8 +657,7 @@
private void startIndexer(final ChangelogState changelogState)
{
- final ChangeNumberIndexer indexer =
- new ChangeNumberIndexer(this, changelogState);
+ final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
if (cnIndexer.compareAndSet(null, indexer))
{
indexer.start();
@@ -671,7 +666,7 @@
/** {@inheritDoc} */
@Override
- public long getDomainLatestTrimDate(DN baseDN)
+ public long getDomainLatestTrimDate(final DN baseDN)
{
return latestPurgeDate;
}
@@ -708,17 +703,15 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
- ServerState startAfterServerState) throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
+ throws ChangelogException
{
final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final Map<DBCursor<UpdateMsg>, Void> cursors =
- new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
+ final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
for (int serverId : serverIds)
{
// get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState != null ?
- startAfterServerState.getCSN(serverId) : null;
+ final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
}
// recycle exhausted cursors,
@@ -728,8 +721,8 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
- CSN startAfterCSN) throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
+ throws ChangelogException
{
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
@@ -763,7 +756,7 @@
/** {@inheritDoc} */
@Override
- public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException
+ public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
{
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
@@ -784,8 +777,7 @@
/** {@inheritDoc} */
@Override
- public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
- throws ChangelogException
+ public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
{
dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
@@ -797,7 +789,7 @@
/**
* The thread purging the changelogDB on a regular interval. Records are
- * purged from the changelogDB is they are older than a delay specified in
+ * purged from the changelogDB if they are older than a delay specified in
* seconds. The purge process works in two steps:
* <ol>
* <li>first purge the changeNumberIndexDB and retrieve information to drive
@@ -858,8 +850,7 @@
}
}
- for (final Map<Integer, JEReplicaDB> domainMap
- : domainToReplicaDBs.values())
+ for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
{
for (final JEReplicaDB replicaDB : domainMap.values())
{
@@ -877,8 +868,7 @@
}
catch (Exception e)
{
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
if (replicationServer != null)
{
replicationServer.shutdown();
--
Gitblit v1.10.0