From 39845070920c859cd1d24cb23090bfa1bfad7b1a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 18 Aug 2014 15:26:23 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |  265 +++++++++++++++++++++++++++++++----------------------
 1 files changed, 155 insertions(+), 110 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 7e75bf1..dc23a7e 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -40,6 +41,7 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
@@ -72,13 +74,20 @@
    * <li>then check it's not null</li>
    * <li>then close all inside</li>
    * </ol>
-   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+   * When creating a replicaDB, synchronize on the domainMap to avoid
    * concurrent shutdown.
    */
-  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
-      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
-  private ReplicationDbEnv dbEnv;
-  private ReplicationServerCfg config;
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private ReplicationDbEnv replicationEnv;
+  private final ReplicationServerCfg config;
   private final File dbDirectory;
 
   /**
@@ -103,9 +112,9 @@
 
   /** The local replication server. */
   private final ReplicationServer replicationServer;
-  private AtomicBoolean shutdown = new AtomicBoolean();
+  private final AtomicBoolean shutdown = new AtomicBoolean();
 
-  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
       new DBCursor<UpdateMsg>()
   {
 
@@ -135,7 +144,7 @@
   };
 
   /**
-   * Builds an instance of this class.
+   * Creates a new changelog DB.
    *
    * @param replicationServer
    *          the local replication server.
@@ -144,15 +153,15 @@
    * @throws ConfigException
    *           if a problem occurs opening the supplied directory
    */
-  public JEChangelogDB(ReplicationServer replicationServer,
-      ReplicationServerCfg config) throws ConfigException
+  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+      throws ConfigException
   {
     this.config = config;
     this.replicationServer = replicationServer;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
-  private File makeDir(String dbDirName) throws ConfigException
+  private File makeDir(final String dbDirName) throws ConfigException
   {
     // Check that this path exists or create it.
     final File dbDirectory = getFileForPath(dbDirName);
@@ -168,15 +177,13 @@
     {
       logger.traceException(e);
 
-      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
-      mb.append(e.getLocalizedMessage());
-      mb.append(" ");
-      mb.append(dbDirectory);
-      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e);
+      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
+          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
+      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
     }
   }
 
-  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
+  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
   {
     final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
     if (domainMap != null)
@@ -186,29 +193,12 @@
     return Collections.emptyMap();
   }
 
-  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
+  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
   {
     return getDomainMap(baseDN).get(serverId);
   }
 
   /**
-   * Provision resources for the specified serverId in the specified replication
-   * domain.
-   *
-   * @param baseDN
-   *          the replication domain where to add the serverId
-   * @param serverId
-   *          the server Id to add to the replication domain
-   * @throws ChangelogException
-   *           If a database error happened.
-   */
-  private void commission(DN baseDN, int serverId, ReplicationServer rs)
-      throws ChangelogException
-  {
-    getOrCreateReplicaDB(baseDN, serverId, rs);
-  }
-
-  /**
    * Returns a {@link JEReplicaDB}, possibly creating it.
    *
    * @param baseDN
@@ -217,35 +207,42 @@
    *          the serverId for which to create a ReplicaDB
    * @param server
    *          the ReplicationServer
-   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
-   *         to be created
+   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
    * @throws ChangelogException
    *           if a problem occurred with the database
    */
-  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
-      int serverId, ReplicationServer server) throws ChangelogException
+  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+      final ReplicationServer server) throws ChangelogException
   {
     while (!shutdown.get())
     {
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
-          getExistingOrNewDomainMap(baseDN);
-      final Pair<JEReplicaDB, Boolean> result =
-          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
       if (result != null)
       {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
         return result;
       }
     }
-    throw new ChangelogException(
-        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
   }
 
-  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
-      DN baseDN)
+  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
   {
     // happy path: the domainMap already exists
-    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
-        domainToReplicaDBs.get(baseDN);
+    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
     if (currentValue != null)
     {
       return currentValue;
@@ -254,30 +251,36 @@
     // unlucky, the domainMap does not exist: take the hit and create the
     // newValue, even though the same could be done concurrently by another
     // thread
-    final ConcurrentMap<Integer, JEReplicaDB> newValue =
-        new ConcurrentHashMap<Integer, JEReplicaDB>();
-    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
-        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
     if (previousValue != null)
     {
       // there was already a value associated to the key, let's use it
       return previousValue;
     }
+
+    if (MultimasterReplication.isECLEnabledDomain(baseDN))
+    {
+      // we just created a new domain => update all cursors
+      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+      {
+        cursor.addDomain(baseDN, null);
+      }
+    }
     return newValue;
   }
 
-  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
-      DN baseDN, ReplicationServer server) throws ChangelogException
+  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    // happy path: the JEReplicaDB already exists
+    // happy path: the replicaDB already exists
     JEReplicaDB currentValue = domainMap.get(serverId);
     if (currentValue != null)
     {
       return Pair.of(currentValue, false);
     }
 
-    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
@@ -293,11 +296,11 @@
         // The domainMap could have been concurrently removed because
         // 1) a shutdown was initiated or 2) an initialize was called.
         // Return will allow the code to:
-        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
         return null;
       }
 
-      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
       domainMap.put(serverId, newDB);
       return Pair.of(newDB, true);
     }
@@ -310,8 +313,8 @@
     try
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
-      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
-      final ChangelogState changelogState = dbEnv.getChangelogState();
+      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+      final ChangelogState changelogState = replicationEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
@@ -338,12 +341,12 @@
     {
       for (int serverId : entry.getValue())
       {
-        commission(entry.getKey(), serverId, replicationServer);
+        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
       }
     }
   }
 
-  private void shutdownCNIndexDB() throws ChangelogException
+  private void shutdownChangeNumberIndexDB() throws ChangelogException
   {
     synchronized (cnIndexDBLock)
     {
@@ -381,7 +384,7 @@
 
     try
     {
-      shutdownCNIndexDB();
+      shutdownChangeNumberIndexDB();
     }
     catch (ChangelogException e)
     {
@@ -402,7 +405,7 @@
       }
     }
 
-    if (dbEnv != null)
+    if (replicationEnv != null)
     {
       // wait for shutdown of the threads holding cursors
       try
@@ -421,7 +424,7 @@
         // do nothing: we are already shutting down
       }
 
-      dbEnv.shutdown();
+      replicationEnv.shutdown();
     }
 
     if (firstException != null)
@@ -431,11 +434,10 @@
   }
 
   /**
-   * Clears all content from the changelog database, but leaves its directory on
-   * the filesystem.
+   * Clears all records from the changelog (does not remove the changelog itself).
    *
    * @throws ChangelogException
-   *           If a database problem happened
+   *           If an error occurs when clearing the changelog.
    */
   public void clearDB() throws ChangelogException
   {
@@ -469,7 +471,7 @@
 
         try
         {
-          shutdownCNIndexDB();
+          shutdownChangeNumberIndexDB();
         }
         catch (ChangelogException e)
         {
@@ -584,7 +586,7 @@
     // 3- clear the changelogstate DB
     try
     {
-      dbEnv.clearGenerationId(baseDN);
+      replicationEnv.clearGenerationId(baseDN);
     }
     catch (ChangelogException e)
     {
@@ -635,7 +637,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(dbEnv.getChangelogState());
+      startIndexer(replicationEnv.getChangelogState());
     }
     else
     {
@@ -673,7 +675,7 @@
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
         }
         catch (Exception e)
         {
@@ -694,40 +696,57 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
-      throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
   {
-    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
-    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
-    for (int serverId : serverIds)
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
-      replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
-      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
     }
-    // recycle exhausted cursors,
-    // because client code will not manage the cursors itself
-    return new CompositeDBCursor<Void>(cursors, true);
+    return cursor;
   }
 
-  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
-      ServerState startAfterServerState)
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+      throws ChangelogException
   {
-    final ServerState domainState = offlineReplicas.getServerState(baseDN);
-    if (domainState != null)
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    for (int serverId : getDomainMap(baseDN).keySet())
     {
-      for (CSN offlineCSN : domainState)
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
       {
-        if (serverId == offlineCSN.getServerId()
-            && !startAfterServerState.cover(offlineCSN))
-        {
-          return offlineCSN;
-        }
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
       }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
     }
     return null;
   }
@@ -737,31 +756,57 @@
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
       throws ChangelogException
   {
-    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      return replicaDB.generateCursorFrom(startAfterCSN);
+      final DBCursor<UpdateMsg> cursor =
+          replicaDB.generateCursorFrom(startAfterCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+      // TODO JNR if (offlineCSN != null) ??
+      // What about replicas that suddenly become offline?
+      return new ReplicaOfflineCursor(cursor, offlineCSN);
     }
-    return EMPTY_CURSOR;
+    return EMPTY_CURSOR_REPLICA_DB;
   }
 
   /** {@inheritDoc} */
   @Override
-  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
-      throws ChangelogException
+  public void unregisterCursor(final DBCursor<?> cursor)
   {
-    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
-        updateMsg.getCSN().getServerId(), replicationServer);
-    final JEReplicaDB replicaDB = pair.getFirst();
-    final boolean wasCreated = pair.getSecond();
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
 
+  /** {@inheritDoc} */
+  @Override
+  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+  {
+    final CSN csn = updateMsg.getCSN();
+    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+        csn.getServerId(), replicationServer);
+    final JEReplicaDB replicaDB = pair.getFirst();
     replicaDB.add(updateMsg);
+
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
-    return wasCreated;
+    return pair.getSecond(); // replica DB was created
   }
 
   /** {@inheritDoc} */
@@ -779,7 +824,7 @@
   @Override
   public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
   {
-    dbEnv.addOfflineReplica(baseDN, offlineCSN);
+    replicationEnv.addOfflineReplica(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {

--
Gitblit v1.10.0