From b6ccb560e9056cc9c028812f5f63ff2e80c95c87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java |  161 +++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 117 insertions(+), 44 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7592fd9..a194d6d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -45,7 +46,8 @@
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
+import org.opends.server.replication.server.changelog.je.DomainDBCursor;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
 import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
@@ -64,6 +66,7 @@
  */
 public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
 {
+  /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
   /**
@@ -77,11 +80,18 @@
    * <li>then check it's not null</li>
    * <li>then close all inside</li>
    * </ol>
-   * When creating a FileReplicaDB, synchronize on the domainMap to avoid
+   * When creating a replicaDB, synchronize on the domainMap to avoid
    * concurrent shutdown.
    */
-  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>>
-      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
   private ReplicationEnvironment replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -124,10 +134,10 @@
    *           if a problem occurs opening the supplied directory
    */
   public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
-     throws ConfigException
+      throws ConfigException
   {
-    this.replicationServer = replicationServer;
     this.config = config;
+    this.replicationServer = replicationServer;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
@@ -175,8 +185,7 @@
    *          the serverId for which to create a ReplicaDB
    * @param server
    *          the ReplicationServer
-   * @return a Pair with the FileReplicaDB and a boolean indicating whether it had
-   *         to be created
+   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
    * @throws ChangelogException
    *           if a problem occurred with the database
    */
@@ -189,6 +198,19 @@
       final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
       if (result != null)
       {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
         return result;
       }
     }
@@ -214,20 +236,26 @@
       // there was already a value associated to the key, let's use it
       return previousValue;
     }
+
+    // we just created a new domain => update all cursors
+    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    {
+      cursor.addDomain(baseDN, null);
+    }
     return newValue;
   }
 
   private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
       final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    // happy path: the FileReplicaDB already exists
+    // happy path: the replicaDB already exists
     FileReplicaDB currentValue = domainMap.get(serverId);
     if (currentValue != null)
     {
       return Pair.of(currentValue, false);
     }
 
-    // unlucky, the FileReplicaDB does not exist: take the hit and synchronize
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
@@ -242,7 +270,7 @@
         // The domainMap could have been concurrently removed because
         // 1) a shutdown was initiated or 2) an initialize was called.
         // Return will allow the code to:
-        // 1) shutdown properly or 2) lazily recreate the FileReplicaDB
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
         return null;
       }
 
@@ -371,6 +399,7 @@
       {
         // do nothing: we are already shutting down
       }
+
       replicationEnv.shutdown();
     }
 
@@ -381,10 +410,10 @@
   }
 
   /**
-   * Clears all records from the changelog (does not remove the log itself).
+   * Clears all records from the changelog (does not remove the changelog itself).
    *
    * @throws ChangelogException
-   *           If an error occurs when clearing the log.
+   *           If an error occurs when clearing the changelog.
    */
   public void clearDB() throws ChangelogException
   {
@@ -629,40 +658,57 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
-      throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
   {
-    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
-    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
-    for (int serverId : serverIds)
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
-      replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
-      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
     }
-    // recycle exhausted cursors,
-    // because client code will not manage the cursors itself
-    return new CompositeDBCursor<Void>(cursors, true);
+    return cursor;
   }
 
-  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
-      ServerState startAfterServerState)
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+      throws ChangelogException
   {
-    final ServerState domainState = offlineReplicas.getServerState(baseDN);
-    if (domainState != null)
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    for (int serverId : getDomainMap(baseDN).keySet())
     {
-      for (CSN offlineCSN : domainState)
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
       {
-        if (serverId == offlineCSN.getServerId()
-            && !startAfterServerState.cover(offlineCSN))
-        {
-          return offlineCSN;
-        }
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
       }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
     }
     return null;
   }
@@ -675,28 +721,55 @@
     final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      return replicaDB.generateCursorFrom(startAfterCSN);
+      final DBCursor<UpdateMsg> cursor =
+          replicaDB.generateCursorFrom(startAfterCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+      // TODO JNR if (offlineCSN != null) ??
+      // What about replicas that suddenly become offline?
+      return new ReplicaOfflineCursor(cursor, offlineCSN);
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
 
   /** {@inheritDoc} */
   @Override
+  public void unregisterCursor(final DBCursor<?> cursor)
+  {
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
   {
+    final CSN csn = updateMsg.getCSN();
     final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
-        updateMsg.getCSN().getServerId(), replicationServer);
+        csn.getServerId(), replicationServer);
     final FileReplicaDB replicaDB = pair.getFirst();
-    final boolean wasCreated = pair.getSecond();
-
     replicaDB.add(updateMsg);
+
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
-      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
-    return wasCreated;
+    return pair.getSecond(); // replica DB was created
   }
 
   /** {@inheritDoc} */

--
Gitblit v1.10.0