From 6531523345efd08282b1454dc06e572ae9eff848 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 244 ++++++++++++++++++++++++++++++------------------
1 files changed, 150 insertions(+), 94 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 723b682..4ebee02 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,10 +29,10 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
@@ -76,13 +76,20 @@
* <li>then check it's not null</li>
* <li>then close all inside</li>
* </ol>
- * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+ * When creating a replicaDB, synchronize on the domainMap to avoid
* concurrent shutdown.
*/
- private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
- domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
- private ReplicationDbEnv dbEnv;
- private ReplicationServerCfg config;
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+ /**
+ * \@GuardedBy("itself")
+ */
+ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+ new HashMap<DN, List<DomainDBCursor>>();
+ private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+ new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private ReplicationDbEnv replicationEnv;
+ private final ReplicationServerCfg config;
private final File dbDirectory;
/**
@@ -107,9 +114,9 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
- private AtomicBoolean shutdown = new AtomicBoolean();
+ private final AtomicBoolean shutdown = new AtomicBoolean();
- private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+ private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
new DBCursor<UpdateMsg>()
{
@@ -139,7 +146,7 @@
};
/**
- * Builds an instance of this class.
+ * Creates a new changelog DB.
*
* @param replicationServer
* the local replication server.
@@ -148,15 +155,15 @@
* @throws ConfigException
* if a problem occurs opening the supplied directory
*/
- public JEChangelogDB(ReplicationServer replicationServer,
- ReplicationServerCfg config) throws ConfigException
+ public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+ throws ConfigException
{
this.config = config;
this.replicationServer = replicationServer;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
- private File makeDir(String dbDirName) throws ConfigException
+ private File makeDir(final String dbDirName) throws ConfigException
{
// Check that this path exists or create it.
final File dbDirectory = getFileForPath(dbDirName);
@@ -173,12 +180,9 @@
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- final MessageBuilder mb = new MessageBuilder();
- mb.append(e.getLocalizedMessage());
- mb.append(" ");
- mb.append(String.valueOf(dbDirectory));
- Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
- throw new ConfigException(msg, e);
+ final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
+ .append(String.valueOf(dbDirectory));
+ throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
}
}
@@ -223,35 +227,42 @@
* the serverId for which to create a ReplicaDB
* @param server
* the ReplicationServer
- * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
- * to be created
+ * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
* @throws ChangelogException
* if a problem occurred with the database
*/
- Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
- int serverId, ReplicationServer server) throws ChangelogException
+ Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+ final ReplicationServer server) throws ChangelogException
{
while (!shutdown.get())
{
- final ConcurrentMap<Integer, JEReplicaDB> domainMap =
- getExistingOrNewDomainMap(baseDN);
- final Pair<JEReplicaDB, Boolean> result =
- getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+ final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
if (result != null)
{
+ final Boolean dbWasCreated = result.getSecond();
+ if (dbWasCreated)
+ { // new replicaDB => update all cursors with it
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (DomainDBCursor cursor : cursors)
+ {
+ cursor.addReplicaDB(serverId, null);
+ }
+ }
+ }
+
return result;
}
}
- throw new ChangelogException(
- ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+ throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
- private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
- DN baseDN)
+ private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
{
// happy path: the domainMap already exists
- final ConcurrentMap<Integer, JEReplicaDB> currentValue =
- domainToReplicaDBs.get(baseDN);
+ final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
if (currentValue != null)
{
return currentValue;
@@ -260,30 +271,33 @@
// unlucky, the domainMap does not exist: take the hit and create the
// newValue, even though the same could be done concurrently by another
// thread
- final ConcurrentMap<Integer, JEReplicaDB> newValue =
- new ConcurrentHashMap<Integer, JEReplicaDB>();
- final ConcurrentMap<Integer, JEReplicaDB> previousValue =
- domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+ final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
if (previousValue != null)
{
// there was already a value associated to the key, let's use it
return previousValue;
}
+
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
return newValue;
}
- private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
- final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
- DN baseDN, ReplicationServer server) throws ChangelogException
+ private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+ final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
{
- // happy path: the JEReplicaDB already exists
+ // happy path: the replicaDB already exists
JEReplicaDB currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
- // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+ // unlucky, the replicaDB does not exist: take the hit and synchronize
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
@@ -299,11 +313,11 @@
// The domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
- // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+ // 1) shutdown properly or 2) lazily recreate the replicaDB
return null;
}
- final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+ final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
domainMap.put(serverId, newDB);
return Pair.of(newDB, true);
}
@@ -316,8 +330,8 @@
try
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
- dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
- final ChangelogState changelogState = dbEnv.getChangelogState();
+ replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+ final ChangelogState changelogState = replicationEnv.getChangelogState();
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
@@ -351,7 +365,7 @@
}
}
- private void shutdownCNIndexDB() throws ChangelogException
+ private void shutdownChangeNumberIndexDB() throws ChangelogException
{
synchronized (cnIndexDBLock)
{
@@ -389,7 +403,7 @@
try
{
- shutdownCNIndexDB();
+ shutdownChangeNumberIndexDB();
}
catch (ChangelogException e)
{
@@ -410,7 +424,7 @@
}
}
- if (dbEnv != null)
+ if (replicationEnv != null)
{
// wait for shutdown of the threads holding cursors
try
@@ -429,7 +443,7 @@
// do nothing: we are already shutting down
}
- dbEnv.shutdown();
+ replicationEnv.shutdown();
}
if (firstException != null)
@@ -439,11 +453,10 @@
}
/**
- * Clears all content from the changelog database, but leaves its directory on
- * the filesystem.
+ * Clears all records from the changelog (does not remove the changelog itself).
*
* @throws ChangelogException
- * If a database problem happened
+ * If an error occurs when clearing the changelog.
*/
public void clearDB() throws ChangelogException
{
@@ -477,7 +490,7 @@
try
{
- shutdownCNIndexDB();
+ shutdownChangeNumberIndexDB();
}
catch (ChangelogException e)
{
@@ -593,7 +606,7 @@
// 3- clear the changelogstate DB
try
{
- dbEnv.clearGenerationId(baseDN);
+ replicationEnv.clearGenerationId(baseDN);
}
catch (ChangelogException e)
{
@@ -644,7 +657,7 @@
{
if (computeChangeNumber)
{
- startIndexer(dbEnv.getChangelogState());
+ startIndexer(replicationEnv.getChangelogState());
}
else
{
@@ -682,7 +695,7 @@
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+ cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
}
catch (Exception e)
{
@@ -704,75 +717,118 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
- throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
{
- final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
- final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
- for (int serverId : serverIds)
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ registeredMultiDomainCursors.add(cursor);
+ for (DN baseDN : domainToReplicaDBs.keySet())
{
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
- final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
- replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
- cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+ cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
}
- // recycle exhausted cursors,
- // because client code will not manage the cursors itself
- return new CompositeDBCursor<Void>(cursors, true);
+ return cursor;
}
- private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
- ServerState startAfterServerState)
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+ throws ChangelogException
{
- final ServerState domainState = offlineReplicas.getServerState(baseDN);
- if (domainState != null)
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ for (int serverId : getDomainMap(baseDN).keySet())
{
- for (CSN offlineCSN : domainState)
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ cursor.addReplicaDB(serverId, lastCSN);
+ }
+ return cursor;
+ }
+
+ private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ {
+ synchronized (registeredDomainCursors)
+ {
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors == null)
{
- if (serverId == offlineCSN.getServerId()
- && !startAfterServerState.cover(offlineCSN))
- {
- return offlineCSN;
- }
+ cursors = new ArrayList<DomainDBCursor>();
+ registeredDomainCursors.put(baseDN, cursors);
}
+ cursors.add(cursor);
+ return cursor;
+ }
+ }
+
+ private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+ {
+ final MultiDomainServerState offlineReplicas =
+ replicationEnv.getChangelogState().getOfflineReplicas();
+ final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+ if (offlineCSN != null
+ && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+ {
+ return offlineCSN;
}
return null;
}
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
throws ChangelogException
{
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- return replicaDB.generateCursorFrom(startAfterCSN);
+ final DBCursor<UpdateMsg> cursor =
+ replicaDB.generateCursorFrom(startAfterCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ // TODO JNR if (offlineCSN != null) ??
+ // What about replicas that suddenly become offline?
+ return new ReplicaOfflineCursor(cursor, offlineCSN);
}
- return EMPTY_CURSOR;
+ return EMPTY_CURSOR_REPLICA_DB;
}
/** {@inheritDoc} */
@Override
- public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
- throws ChangelogException
+ public void unregisterCursor(final DBCursor<?> cursor)
{
- final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
- updateMsg.getCSN().getServerId(), replicationServer);
- final JEReplicaDB replicaDB = pair.getFirst();
- final boolean wasCreated = pair.getSecond();
+ if (cursor instanceof MultiDomainDBCursor)
+ {
+ registeredMultiDomainCursors.remove(cursor);
+ }
+ else if (cursor instanceof DomainDBCursor)
+ {
+ final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+ synchronized (registeredMultiDomainCursors)
+ {
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ }
+ /** {@inheritDoc} */
+ @Override
+ public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+ {
+ final CSN csn = updateMsg.getCSN();
+ final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+ csn.getServerId(), replicationServer);
+ final JEReplicaDB replicaDB = pair.getFirst();
replicaDB.add(updateMsg);
+
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
- notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+ notifyReplicaOnline(indexer, baseDN, csn.getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
- return wasCreated;
+ return pair.getSecond(); // replica DB was created
}
/** {@inheritDoc} */
@@ -792,7 +848,7 @@
{
if (indexer.isReplicaOffline(baseDN, serverId))
{
- dbEnv.notifyReplicaOnline(baseDN, serverId);
+ replicationEnv.notifyReplicaOnline(baseDN, serverId);
}
}
@@ -800,7 +856,7 @@
@Override
public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
{
- dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
+ replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
--
Gitblit v1.10.0