From b6ccb560e9056cc9c028812f5f63ff2e80c95c87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 161 +++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 117 insertions(+), 44 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7592fd9..a194d6d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,7 +46,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
+import org.opends.server.replication.server.changelog.je.DomainDBCursor;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
@@ -64,6 +66,7 @@
*/
public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
{
+ /** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
/**
@@ -77,11 +80,18 @@
* <li>then check it's not null</li>
* <li>then close all inside</li>
* </ol>
- * When creating a FileReplicaDB, synchronize on the domainMap to avoid
+ * When creating a replicaDB, synchronize on the domainMap to avoid
* concurrent shutdown.
*/
- private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>>
- domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+ /**
+ * \@GuardedBy("itself")
+ */
+ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+ new HashMap<DN, List<DomainDBCursor>>();
+ private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+ new CopyOnWriteArrayList<MultiDomainDBCursor>();
private ReplicationEnvironment replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -124,10 +134,10 @@
* if a problem occurs opening the supplied directory
*/
public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
- throws ConfigException
+ throws ConfigException
{
- this.replicationServer = replicationServer;
this.config = config;
+ this.replicationServer = replicationServer;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
@@ -175,8 +185,7 @@
* the serverId for which to create a ReplicaDB
* @param server
* the ReplicationServer
- * @return a Pair with the FileReplicaDB and a boolean indicating whether it had
- * to be created
+ * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
* @throws ChangelogException
* if a problem occurred with the database
*/
@@ -189,6 +198,19 @@
final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
if (result != null)
{
+ final Boolean dbWasCreated = result.getSecond();
+ if (dbWasCreated)
+ { // new replicaDB => update all cursors with it
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (DomainDBCursor cursor : cursors)
+ {
+ cursor.addReplicaDB(serverId, null);
+ }
+ }
+ }
+
return result;
}
}
@@ -214,20 +236,26 @@
// there was already a value associated to the key, let's use it
return previousValue;
}
+
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
return newValue;
}
private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
{
- // happy path: the FileReplicaDB already exists
+ // happy path: the replicaDB already exists
FileReplicaDB currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
- // unlucky, the FileReplicaDB does not exist: take the hit and synchronize
+ // unlucky, the replicaDB does not exist: take the hit and synchronize
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
@@ -242,7 +270,7 @@
// The domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
- // 1) shutdown properly or 2) lazily recreate the FileReplicaDB
+ // 1) shutdown properly or 2) lazily recreate the replicaDB
return null;
}
@@ -371,6 +399,7 @@
{
// do nothing: we are already shutting down
}
+
replicationEnv.shutdown();
}
@@ -381,10 +410,10 @@
}
/**
- * Clears all records from the changelog (does not remove the log itself).
+ * Clears all records from the changelog (does not remove the changelog itself).
*
* @throws ChangelogException
- * If an error occurs when clearing the log.
+ * If an error occurs when clearing the changelog.
*/
public void clearDB() throws ChangelogException
{
@@ -629,40 +658,57 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
- throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
{
- final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
- final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
- for (int serverId : serverIds)
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ registeredMultiDomainCursors.add(cursor);
+ for (DN baseDN : domainToReplicaDBs.keySet())
{
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
- final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
- replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
- cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+ cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
}
- // recycle exhausted cursors,
- // because client code will not manage the cursors itself
- return new CompositeDBCursor<Void>(cursors, true);
+ return cursor;
}
- private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
- ServerState startAfterServerState)
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+ throws ChangelogException
{
- final ServerState domainState = offlineReplicas.getServerState(baseDN);
- if (domainState != null)
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ for (int serverId : getDomainMap(baseDN).keySet())
{
- for (CSN offlineCSN : domainState)
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ cursor.addReplicaDB(serverId, lastCSN);
+ }
+ return cursor;
+ }
+
+ private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ {
+ synchronized (registeredDomainCursors)
+ {
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors == null)
{
- if (serverId == offlineCSN.getServerId()
- && !startAfterServerState.cover(offlineCSN))
- {
- return offlineCSN;
- }
+ cursors = new ArrayList<DomainDBCursor>();
+ registeredDomainCursors.put(baseDN, cursors);
}
+ cursors.add(cursor);
+ return cursor;
+ }
+ }
+
+ private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+ {
+ final MultiDomainServerState offlineReplicas =
+ replicationEnv.getChangelogState().getOfflineReplicas();
+ final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+ if (offlineCSN != null
+ && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+ {
+ return offlineCSN;
}
return null;
}
@@ -675,28 +721,55 @@
final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- return replicaDB.generateCursorFrom(startAfterCSN);
+ final DBCursor<UpdateMsg> cursor =
+ replicaDB.generateCursorFrom(startAfterCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ // TODO JNR if (offlineCSN != null) ??
+ // What about replicas that suddenly become offline?
+ return new ReplicaOfflineCursor(cursor, offlineCSN);
}
return EMPTY_CURSOR_REPLICA_DB;
}
/** {@inheritDoc} */
@Override
+ public void unregisterCursor(final DBCursor<?> cursor)
+ {
+ if (cursor instanceof MultiDomainDBCursor)
+ {
+ registeredMultiDomainCursors.remove(cursor);
+ }
+ else if (cursor instanceof DomainDBCursor)
+ {
+ final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+ synchronized (registeredMultiDomainCursors)
+ {
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
{
+ final CSN csn = updateMsg.getCSN();
final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
- updateMsg.getCSN().getServerId(), replicationServer);
+ csn.getServerId(), replicationServer);
final FileReplicaDB replicaDB = pair.getFirst();
- final boolean wasCreated = pair.getSecond();
-
replicaDB.add(updateMsg);
+
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
- notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+ notifyReplicaOnline(indexer, baseDN, csn.getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
- return wasCreated;
+ return pair.getSecond(); // replica DB was created
}
/** {@inheritDoc} */
--
Gitblit v1.10.0