From 938dda347b7223b73a1c5d5c47c8674ecdd90102 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 14:04:42 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4244) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 53 ++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 44 insertions(+), 9 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 5fd16da..0b2de60 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -87,6 +88,8 @@
new HashMap<DN, List<DomainDBCursor>>();
private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
private ReplicationDbEnv replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -720,7 +723,7 @@
return cursor;
}
- private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
{
synchronized (registeredDomainCursors)
{
@@ -751,21 +754,31 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
- PositionStrategy positionStrategy) throws ChangelogException
-
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
+ " is not supported for the JE implementation fo changelog");
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- final DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startCSN);
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
- // TODO JNR if (offlineCSN != null) ??
- // What about replicas that suddenly become offline?
- return new ReplicaOfflineCursor(cursor, offlineCSN);
+ final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+ final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+ synchronized (replicaCursors)
+ {
+ List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+ if (cursors == null)
+ {
+ cursors = new ArrayList<ReplicaCursor>();
+ replicaCursors.put(replicaID, cursors);
+ }
+ cursors.add(replicaCursor);
+ }
+
+ return replicaCursor;
}
return EMPTY_CURSOR_REPLICA_DB;
}
@@ -790,6 +803,15 @@
}
}
}
+ else if (cursor instanceof ReplicaCursor)
+ {
+ final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+ final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
}
/** {@inheritDoc} */
@@ -831,6 +853,19 @@
{
indexer.replicaOffline(baseDN, offlineCSN);
}
+ updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+ }
+
+ private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+ {
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (ReplicaCursor cursor : cursors)
+ {
+ cursor.setOfflineCSN(offlineCSN);
+ }
+ }
}
/**
--
Gitblit v1.10.0