From 6531523345efd08282b1454dc06e572ae9eff848 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |  244 ++++++++++++++++++++++++++++++------------------
 1 files changed, 150 insertions(+), 94 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 723b682..4ebee02 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,10 +29,10 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
@@ -76,13 +76,20 @@
    * <li>then check it's not null</li>
    * <li>then close all inside</li>
    * </ol>
-   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+   * When creating a replicaDB, synchronize on the domainMap to avoid
    * concurrent shutdown.
    */
-  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
-      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
-  private ReplicationDbEnv dbEnv;
-  private ReplicationServerCfg config;
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private ReplicationDbEnv replicationEnv;
+  private final ReplicationServerCfg config;
   private final File dbDirectory;
 
   /**
@@ -107,9 +114,9 @@
 
   /** The local replication server. */
   private final ReplicationServer replicationServer;
-  private AtomicBoolean shutdown = new AtomicBoolean();
+  private final AtomicBoolean shutdown = new AtomicBoolean();
 
-  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
       new DBCursor<UpdateMsg>()
   {
 
@@ -139,7 +146,7 @@
   };
 
   /**
-   * Builds an instance of this class.
+   * Creates a new changelog DB.
    *
    * @param replicationServer
    *          the local replication server.
@@ -148,15 +155,15 @@
    * @throws ConfigException
    *           if a problem occurs opening the supplied directory
    */
-  public JEChangelogDB(ReplicationServer replicationServer,
-      ReplicationServerCfg config) throws ConfigException
+  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+      throws ConfigException
   {
     this.config = config;
     this.replicationServer = replicationServer;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
-  private File makeDir(String dbDirName) throws ConfigException
+  private File makeDir(final String dbDirName) throws ConfigException
   {
     // Check that this path exists or create it.
     final File dbDirectory = getFileForPath(dbDirName);
@@ -173,12 +180,9 @@
       if (debugEnabled())
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
 
-      final MessageBuilder mb = new MessageBuilder();
-      mb.append(e.getLocalizedMessage());
-      mb.append(" ");
-      mb.append(String.valueOf(dbDirectory));
-      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
-      throw new ConfigException(msg, e);
+      final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
+          .append(String.valueOf(dbDirectory));
+      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
     }
   }
 
@@ -223,35 +227,42 @@
    *          the serverId for which to create a ReplicaDB
    * @param server
    *          the ReplicationServer
-   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
-   *         to be created
+   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
    * @throws ChangelogException
    *           if a problem occurred with the database
    */
-  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
-      int serverId, ReplicationServer server) throws ChangelogException
+  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+      final ReplicationServer server) throws ChangelogException
   {
     while (!shutdown.get())
     {
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
-          getExistingOrNewDomainMap(baseDN);
-      final Pair<JEReplicaDB, Boolean> result =
-          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
       if (result != null)
       {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
         return result;
       }
     }
-    throw new ChangelogException(
-        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
   }
 
-  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
-      DN baseDN)
+  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
   {
     // happy path: the domainMap already exists
-    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
-        domainToReplicaDBs.get(baseDN);
+    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
     if (currentValue != null)
     {
       return currentValue;
@@ -260,30 +271,33 @@
     // unlucky, the domainMap does not exist: take the hit and create the
     // newValue, even though the same could be done concurrently by another
     // thread
-    final ConcurrentMap<Integer, JEReplicaDB> newValue =
-        new ConcurrentHashMap<Integer, JEReplicaDB>();
-    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
-        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
     if (previousValue != null)
     {
       // there was already a value associated to the key, let's use it
       return previousValue;
     }
+
+    // we just created a new domain => update all cursors
+    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    {
+      cursor.addDomain(baseDN, null);
+    }
     return newValue;
   }
 
-  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
-      DN baseDN, ReplicationServer server) throws ChangelogException
+  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    // happy path: the JEReplicaDB already exists
+    // happy path: the replicaDB already exists
     JEReplicaDB currentValue = domainMap.get(serverId);
     if (currentValue != null)
     {
       return Pair.of(currentValue, false);
     }
 
-    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
@@ -299,11 +313,11 @@
         // The domainMap could have been concurrently removed because
         // 1) a shutdown was initiated or 2) an initialize was called.
         // Return will allow the code to:
-        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
         return null;
       }
 
-      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
       domainMap.put(serverId, newDB);
       return Pair.of(newDB, true);
     }
@@ -316,8 +330,8 @@
     try
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
-      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
-      final ChangelogState changelogState = dbEnv.getChangelogState();
+      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+      final ChangelogState changelogState = replicationEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
@@ -351,7 +365,7 @@
     }
   }
 
-  private void shutdownCNIndexDB() throws ChangelogException
+  private void shutdownChangeNumberIndexDB() throws ChangelogException
   {
     synchronized (cnIndexDBLock)
     {
@@ -389,7 +403,7 @@
 
     try
     {
-      shutdownCNIndexDB();
+      shutdownChangeNumberIndexDB();
     }
     catch (ChangelogException e)
     {
@@ -410,7 +424,7 @@
       }
     }
 
-    if (dbEnv != null)
+    if (replicationEnv != null)
     {
       // wait for shutdown of the threads holding cursors
       try
@@ -429,7 +443,7 @@
         // do nothing: we are already shutting down
       }
 
-      dbEnv.shutdown();
+      replicationEnv.shutdown();
     }
 
     if (firstException != null)
@@ -439,11 +453,10 @@
   }
 
   /**
-   * Clears all content from the changelog database, but leaves its directory on
-   * the filesystem.
+   * Clears all records from the changelog (does not remove the changelog itself).
    *
    * @throws ChangelogException
-   *           If a database problem happened
+   *           If an error occurs when clearing the changelog.
    */
   public void clearDB() throws ChangelogException
   {
@@ -477,7 +490,7 @@
 
         try
         {
-          shutdownCNIndexDB();
+          shutdownChangeNumberIndexDB();
         }
         catch (ChangelogException e)
         {
@@ -593,7 +606,7 @@
     // 3- clear the changelogstate DB
     try
     {
-      dbEnv.clearGenerationId(baseDN);
+      replicationEnv.clearGenerationId(baseDN);
     }
     catch (ChangelogException e)
     {
@@ -644,7 +657,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(dbEnv.getChangelogState());
+      startIndexer(replicationEnv.getChangelogState());
     }
     else
     {
@@ -682,7 +695,7 @@
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+          cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
         }
         catch (Exception e)
         {
@@ -704,75 +717,118 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
-      throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
   {
-    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
-    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
-    for (int serverId : serverIds)
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
-      replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
-      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
     }
-    // recycle exhausted cursors,
-    // because client code will not manage the cursors itself
-    return new CompositeDBCursor<Void>(cursors, true);
+    return cursor;
   }
 
-  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
-      ServerState startAfterServerState)
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+      throws ChangelogException
   {
-    final ServerState domainState = offlineReplicas.getServerState(baseDN);
-    if (domainState != null)
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    for (int serverId : getDomainMap(baseDN).keySet())
     {
-      for (CSN offlineCSN : domainState)
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
       {
-        if (serverId == offlineCSN.getServerId()
-            && !startAfterServerState.cover(offlineCSN))
-        {
-          return offlineCSN;
-        }
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
       }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
     }
     return null;
   }
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
       throws ChangelogException
   {
     JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      return replicaDB.generateCursorFrom(startAfterCSN);
+      final DBCursor<UpdateMsg> cursor =
+          replicaDB.generateCursorFrom(startAfterCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+      // TODO JNR if (offlineCSN != null) ??
+      // What about replicas that suddenly become offline?
+      return new ReplicaOfflineCursor(cursor, offlineCSN);
     }
-    return EMPTY_CURSOR;
+    return EMPTY_CURSOR_REPLICA_DB;
   }
 
   /** {@inheritDoc} */
   @Override
-  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
-      throws ChangelogException
+  public void unregisterCursor(final DBCursor<?> cursor)
   {
-    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
-        updateMsg.getCSN().getServerId(), replicationServer);
-    final JEReplicaDB replicaDB = pair.getFirst();
-    final boolean wasCreated = pair.getSecond();
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
 
+  /** {@inheritDoc} */
+  @Override
+  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+  {
+    final CSN csn = updateMsg.getCSN();
+    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+        csn.getServerId(), replicationServer);
+    final JEReplicaDB replicaDB = pair.getFirst();
     replicaDB.add(updateMsg);
+
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
-      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
-    return wasCreated;
+    return pair.getSecond(); // replica DB was created
   }
 
   /** {@inheritDoc} */
@@ -792,7 +848,7 @@
   {
     if (indexer.isReplicaOffline(baseDN, serverId))
     {
-      dbEnv.notifyReplicaOnline(baseDN, serverId);
+      replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
   }
 
@@ -800,7 +856,7 @@
   @Override
   public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
   {
-    dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
+    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {

--
Gitblit v1.10.0