From dc20d17584703e0736ef3982bc0dc18b4d11bb37 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 13:41:47 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4244) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java |   48 +++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7025753..af4cbb8 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +50,7 @@
 import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
 import org.opends.server.replication.server.changelog.je.DomainDBCursor;
 import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
+import org.opends.server.replication.server.changelog.je.ReplicaCursor;
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
@@ -94,6 +95,8 @@
       new HashMap<DN, List<DomainDBCursor>>();
   private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
       new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
   private ReplicationEnvironment replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -714,16 +717,28 @@
   /** {@inheritDoc} */
   @Override
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
-      PositionStrategy positionStrategy) throws ChangelogException
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
     final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
       final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
-      // TODO JNR if (offlineCSN != null) ??
-      // What about replicas that suddenly become offline?
-      return new ReplicaOfflineCursor(cursor, offlineCSN);
+      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+      synchronized (replicaCursors)
+      {
+        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+        if (cursors == null)
+        {
+          cursors = new ArrayList<ReplicaCursor>();
+          replicaCursors.put(replicaID, cursors);
+        }
+        cursors.add(replicaCursor);
+      }
+
+      return replicaCursor;
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
@@ -748,6 +763,15 @@
         }
       }
     }
+    else if (cursor instanceof ReplicaCursor)
+    {
+      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+      if (cursors != null)
+      {
+        cursors.remove(cursor);
+      }
+    }
   }
 
   /** {@inheritDoc} */
@@ -788,6 +812,7 @@
     {
       replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
+    updateCursorsWithOfflineCSN(baseDN, serverId, null);
   }
 
   /** {@inheritDoc} */
@@ -800,6 +825,19 @@
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
+    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
+  }
+
+  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
+  {
+    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+    if (cursors != null)
+    {
+      for (ReplicaCursor cursor : cursors)
+      {
+        cursor.setOfflineCSN(offlineCSN);
+      }
+    }
   }
 
   /**

--
Gitblit v1.10.0