From a670c31c686f19c2e8619602de6518cde032f61f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Sep 2013 21:41:58 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  235 +++++++++++++++++-----------------------------------------
 1 files changed, 68 insertions(+), 167 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index c4e4eee..eef08e4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -46,9 +47,9 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
-import org.opends.server.replication.server.changelog.je.DbHandler;
 import org.opends.server.types.*;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -117,17 +118,26 @@
   private final Queue<MessageHandler> otherHandlers =
     new ConcurrentLinkedQueue<MessageHandler>();
 
-  /**
-   * This map contains the List of updates received from each LDAP server.
-   */
-  private final Map<Integer, DbHandler> sourceDbHandlers =
-      new ConcurrentHashMap<Integer, DbHandler>();
+  private final ChangelogDB changelogDB;
   /** The ReplicationServer that created the current instance. */
   private ReplicationServer localReplicationServer;
 
-  /** GenerationId management. */
+  /**
+   * The generationId of the current replication domain. The generationId is
+   * computed by hashing the first 1000 entries in the DB.
+   */
   private volatile long generationId = -1;
-  private boolean generationIdSavedStatus = false;
+  /**
+   * JNR, this is legacy code, hard to follow logic. I think what this field
+   * tries to say is: "is the generationId in use anywhere?", i.e. is there a
+   * replication topology in place? As soon as an answer to any of these
+   * question comes true, then it is set to true.
+   * <p>
+   * It looks like the only use of this field is to prevent the
+   * {@link #generationId} from being reset by
+   * {@link #resetGenerationIdIfPossible()}.
+   */
+  private volatile boolean generationIdSavedStatus = false;
 
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
@@ -177,6 +187,7 @@
     this.assuredTimeoutTimer = new Timer("Replication server RS("
         + localReplicationServer.getServerId()
         + ") assured timer for domain \"" + baseDn + "\"", true);
+    this.changelogDB = localReplicationServer.getChangelogDB();
 
     DirectoryServer.registerMonitorProvider(this);
   }
@@ -252,7 +263,7 @@
       }
     }
 
-    if (!publishMessage(update, serverId))
+    if (!publishUpdateMsg(update, serverId))
     {
       return;
     }
@@ -390,43 +401,46 @@
     }
   }
 
-  private boolean publishMessage(UpdateMsg update, int serverId)
+  private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId)
   {
-    // look for the dbHandler that is responsible for the LDAP server which
-    // generated the change.
-    DbHandler dbHandler;
-    synchronized (sourceDbHandlers)
+    try
     {
-      dbHandler = sourceDbHandlers.get(serverId);
-      if (dbHandler == null)
+      if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
       {
-        try
-        {
-          dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
-          generationIdSavedStatus = true;
-        } catch (ChangelogException e)
+        /*
+         * JNR: Matt and I had a hard time figuring out where to put this
+         * synchronized block. We elected to put it here, but without a strong
+         * conviction.
+         */
+        synchronized (generationIDLock)
         {
           /*
-           * Because of database problem we can't save any more changes
-           * from at least one LDAP server.
-           * This replicationServer therefore can't do it's job properly anymore
-           * and needs to close all its connections and shutdown itself.
+           * JNR: I think the generationIdSavedStatus is set to true because
+           * method above created a ReplicaDB which assumes the generationId was
+           * communicated to another server. Hence setting true on this field
+           * prevent the generationId from being reset.
            */
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(" ");
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          localReplicationServer.shutdown();
-          return false;
+          generationIdSavedStatus = true;
         }
-        sourceDbHandlers.put(serverId, dbHandler);
       }
+      return true;
     }
-
-    // Publish the messages to the source handler
-    dbHandler.add(update);
-    return true;
+    catch (ChangelogException e)
+    {
+      /*
+       * Because of database problem we can't save any more changes from at
+       * least one LDAP server. This replicationServer therefore can't do it's
+       * job properly anymore and needs to close all its connections and
+       * shutdown itself.
+       */
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+      mb.append(" ");
+      mb.append(stackTraceToSingleLineString(e));
+      logError(mb.toMessage());
+      localReplicationServer.shutdown();
+      return false;
+    }
   }
 
   private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
@@ -1261,7 +1275,7 @@
    */
   public Set<Integer> getServerIds()
   {
-    return sourceDbHandlers.keySet();
+    return changelogDB.getDomainServerIds(baseDn);
   }
 
   /**
@@ -1278,29 +1292,7 @@
    */
   public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
   {
-    DbHandler dbHandler = sourceDbHandlers.get(serverId);
-    if (dbHandler == null)
-    {
-      return null;
-    }
-
-    ReplicaDBCursor cursor;
-    try
-    {
-      cursor = dbHandler.generateCursorFrom(startAfterCSN);
-    }
-    catch (Exception e)
-    {
-      return null;
-    }
-
-    if (!cursor.next())
-    {
-      close(cursor);
-      return null;
-    }
-
-    return cursor;
+    return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
   }
 
  /**
@@ -1313,12 +1305,7 @@
   */
   public long getCount(int serverId, CSN from, CSN to)
   {
-    DbHandler dbHandler = sourceDbHandlers.get(serverId);
-    if (dbHandler != null)
-    {
-      return dbHandler.getCount(from, to);
-    }
-    return 0;
+    return changelogDB.getCount(baseDn, serverId, from, to);
   }
 
   /**
@@ -1328,12 +1315,7 @@
    */
   public long getChangesCount()
   {
-    long entryCount = 0;
-    for (DbHandler dbHandler : sourceDbHandlers.values())
-    {
-      entryCount += dbHandler.getChangesCount();
-    }
-    return entryCount;
+    return changelogDB.getDomainChangesCount(baseDn);
   }
 
   /**
@@ -1346,24 +1328,6 @@
   }
 
   /**
-   * Sets the provided DbHandler associated to the provided serverId.
-   *
-   * @param serverId  the serverId for the server to which is
-   *                  associated the DbHandler.
-   * @param dbHandler the dbHandler associated to the serverId.
-   *
-   * @throws ChangelogException If a database error happened.
-   */
-  public void setDbHandler(int serverId, DbHandler dbHandler)
-    throws ChangelogException
-  {
-    synchronized (sourceDbHandlers)
-    {
-      sourceDbHandlers.put(serverId, dbHandler);
-    }
-  }
-
-  /**
    * Retrieves the destination handlers for a routable message.
    *
    * @param msg The message to route.
@@ -1734,20 +1698,7 @@
 
     stopAllServers(true);
 
-    shutdownDbHandlers();
-  }
-
-  /** Shutdown all the dbHandlers. */
-  private void shutdownDbHandlers()
-  {
-    synchronized (sourceDbHandlers)
-    {
-      for (DbHandler dbHandler : sourceDbHandlers.values())
-      {
-        dbHandler.shutdown();
-      }
-      sourceDbHandlers.clear();
-    }
+    changelogDB.shutdownDomain(baseDn);
   }
 
   /**
@@ -1758,9 +1709,9 @@
   public ServerState getDbServerState()
   {
     ServerState serverState = new ServerState();
-    for (DbHandler db : sourceDbHandlers.values())
+    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
     {
-      serverState.update(db.getLastChange());
+      serverState.update(lastCSN);
     }
     return serverState;
   }
@@ -2235,24 +2186,7 @@
   public void clearDbs()
   {
     // Reset the localchange and state db for the current domain
-    synchronized (sourceDbHandlers)
-    {
-      for (DbHandler dbHandler : sourceDbHandlers.values())
-      {
-        try
-        {
-          dbHandler.clear();
-        } catch (Exception e)
-        {
-          // TODO: i18n
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
-              e.getMessage() + " " + stackTraceToSingleLineString(e)));
-          logError(mb.toMessage());
-        }
-      }
-      shutdownDbHandlers();
-    }
+    changelogDB.clearDomain(baseDn);
     try
     {
       localReplicationServer.clearGenerationId(baseDn);
@@ -2397,20 +2331,6 @@
   }
 
   /**
-   * Set the purge delay on all the db Handlers for this Domain
-   * of Replication.
-   *
-   * @param delay The new purge delay to use.
-   */
-  public void setPurgeDelay(long delay)
-  {
-    for (DbHandler dbHandler : sourceDbHandlers.values())
-    {
-      dbHandler.setPurgeDelay(delay);
-    }
-  }
-
-  /**
    * Get the map of connected DSs.
    * @return The map of connected DSs
    */
@@ -2667,7 +2587,6 @@
     {
       for (int serverId : dbState)
       {
-        DbHandler h = sourceDbHandlers.get(serverId);
         CSN mostRecentDbCSN = dbState.getCSN(serverId);
         try {
           // Is the most recent change in the Db newer than eligible CSN ?
@@ -2676,19 +2595,8 @@
           if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
           {
             // let's try to seek the first change <= eligibleCSN
-            ReplicaDBCursor cursor = null;
-            try {
-              cursor = h.generateCursorFrom(eligibleCSN);
-              if (cursor != null && cursor.getChange() != null) {
-                CSN newCSN = cursor.getChange().getCSN();
-                result.update(newCSN);
-              }
-            } catch (ChangelogException e) {
-              // there's no change older than eligibleCSN (case of s3/csn31)
-              result.update(new CSN(0, 0, serverId));
-            } finally {
-              close(cursor);
-            }
+            CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
+            result.update(newCSN);
           } else {
             // for this serverId, all changes in the ChangelogDb are holder
             // than eligibleCSN, the most recent in the db is our guy.
@@ -2721,9 +2629,9 @@
   public ServerState getStartState()
   {
     ServerState domainStartState = new ServerState();
-    for (DbHandler dbHandler : sourceDbHandlers.values())
+    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
     {
-      domainStartState.update(dbHandler.getFirstChange());
+      domainStartState.update(firstCSN);
     }
     return domainStartState;
   }
@@ -2741,10 +2649,12 @@
   {
     CSN eligibleCSN = null;
 
-    for (DbHandler db : sourceDbHandlers.values())
+    for (Entry<Integer, CSN> entry :
+      changelogDB.getDomainLastCSNs(baseDn).entrySet())
     {
       // Consider this producer (DS/db).
-      int serverId = db.getServerId();
+      final int serverId = entry.getKey();
+      final CSN changelogLastCSN = entry.getValue();
 
       // Should it be considered for eligibility ?
       CSN heartbeatLastCSN =
@@ -2774,7 +2684,6 @@
         continue;
       }
 
-      CSN changelogLastCSN = db.getLastChange();
       if (changelogLastCSN != null
           && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
       {
@@ -2935,15 +2844,7 @@
    */
   public long getLatestDomainTrimDate()
   {
-    long latest = 0;
-    for (DbHandler db : sourceDbHandlers.values())
-    {
-      if (latest == 0 || latest < db.getLatestTrimDate())
-      {
-        latest = db.getLatestTrimDate();
-      }
-    }
-    return latest;
+    return changelogDB.getDomainLatestTrimDate(baseDn);
   }
 
   /**

--
Gitblit v1.10.0