From d454b5f9a2b7dc4ef2a70cd983a26436568cbe04 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 13:52:43 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java        |  169 +++++++++++++++++++++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java          |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java                    |   76 ++--------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java    |    8 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java |   47 ++++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java           |   45 +++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java                |   59 ++++----
 7 files changed, 254 insertions(+), 153 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed243c4..07f2371 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@
  */
 package org.opends.server.replication.server;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
@@ -294,22 +295,19 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          NavigableSet<ReplicaDBCursor> sortedCursors = null;
+          ReplicaDBCursor cursor = null;
           try
           {
-            sortedCursors = collectAllCursorsWithChanges();
-
             // fill the lateQueue
-            while (!sortedCursors.isEmpty()
-                && lateQueue.count() < 100
-                && lateQueue.bytesCount() < 50000)
+            cursor = replicationServerDomain.getCursorFrom(serverState);
+            while (cursor.next() && isLateQueueBelowThreshold())
             {
-              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+              lateQueue.add(cursor.getChange());
             }
           }
           finally
           {
-            close(sortedCursors);
+            close(cursor);
           }
 
           /*
@@ -403,34 +401,9 @@
     return null;
   }
 
-  private UpdateMsg nextOldestUpdateMsg(
-      NavigableSet<ReplicaDBCursor> sortedCursors)
+  private boolean isLateQueueBelowThreshold()
   {
-    /*
-     * The cursors are sorted based on the currentChange of each cursor to
-     * consider the next change across all servers.
-     * To keep consistent the order of the cursors in the SortedSet,
-     * it is necessary to remove and eventually add again a cursor (after moving
-     * it forward).
-     */
-    final ReplicaDBCursor cursor = sortedCursors.pollFirst();
-    final UpdateMsg result = cursor.getChange();
-    cursor.next();
-    addCursorIfNotEmpty(sortedCursors, cursor);
-    return result;
-  }
-
-  private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
-      ReplicaDBCursor cursor)
-  {
-    if (cursor.getChange() != null)
-    {
-      cursors.add(cursor);
-    }
-    else
-    {
-      close(cursor);
-    }
+    return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
   }
 
   /**
@@ -476,12 +449,12 @@
 
   private CSN findOldestCSNFromReplicaDBs()
   {
-    SortedSet<ReplicaDBCursor> sortedCursors = null;
+    ReplicaDBCursor cursor = null;
     try
     {
-      sortedCursors = collectAllCursorsWithChanges();
-      UpdateMsg msg = sortedCursors.first().getChange();
-      return msg.getCSN();
+      cursor = replicationServerDomain.getCursorFrom(serverState);
+      cursor.next();
+      return cursor.getChange().getCSN();
     }
     catch (Exception e)
     {
@@ -489,32 +462,11 @@
     }
     finally
     {
-      close(sortedCursors);
+      close(cursor);
     }
   }
 
   /**
-   * Collects all the {@link ReplicaDBCursor}s that have changes and sort them
-   * with the oldest {@link CSN} first.
-   *
-   * @return a List of cursors with changes sorted by their {@link CSN}
-   *         (oldest first)
-   */
-  private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
-  {
-    final NavigableSet<ReplicaDBCursor> results =
-        new TreeSet<ReplicaDBCursor>();
-    for (int serverId : replicationServerDomain.getServerIds())
-    {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCsn = serverState.getCSN(serverId);
-      addCursorIfNotEmpty(results,
-          replicationServerDomain.getCursorFrom(serverId, lastCsn));
-    }
-    return results;
-  }
-
-  /**
    * Get the count of updates sent to this server.
    * @return  The count of update sent to this server.
    */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 9c8c0c2..f8a9cac 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -79,8 +79,7 @@
  * <p>
  * Currently are only implemented the create and restore backup features.
  */
-public class ReplicationBackend
-       extends Backend
+public class ReplicationBackend extends Backend
 {
   private static final String CHANGE_NUMBER = "replicationChangeNumber";
 
@@ -620,43 +619,41 @@
    * Exports or returns all the changes from a ReplicationServerDomain coming
    * after the CSN specified in the searchOperation.
    */
-  private void writeChangesAfterCSN(ReplicationServerDomain rsd,
+  private void writeChangesAfterCSN(ReplicationServerDomain rsDomain,
       final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
       SearchOperation searchOperation, final CSN previousCSN)
   {
-    for (int serverId : rsd.getServerIds())
+    if (exportConfig != null && exportConfig.isCancelled())
+    { // Abort if cancelled
+      return;
+    }
+
+    ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN);
+    try
     {
-      if (exportConfig != null && exportConfig.isCancelled())
-      { // Abort if cancelled
-        return;
-      }
+      int lookthroughCount = 0;
 
-      ReplicaDBCursor cursor = rsd.getCursorFrom(serverId, previousCSN);
-      try
+      // Walk through the changes
+      cursor.next(); // first try to advance the cursor
+      while (cursor.getChange() != null)
       {
-        int lookthroughCount = 0;
-
-        // Walk through the changes
-        while (cursor.getChange() != null)
-        {
-          if (exportConfig != null && exportConfig.isCancelled())
-          { // abort if cancelled
-            return;
-          }
-          if (!canContinue(searchOperation, lookthroughCount))
-          {
-            break;
-          }
-          lookthroughCount++;
-          writeChange(cursor.getChange(), ldifWriter, searchOperation,
-              rsd.getBaseDN(), exportConfig != null);
-          cursor.next();
+        if (exportConfig != null && exportConfig.isCancelled())
+        { // abort if cancelled
+          return;
         }
+        if (!canContinue(searchOperation, lookthroughCount))
+        {
+          break;
+        }
+        lookthroughCount++;
+        writeChange(cursor.getChange(), ldifWriter, searchOperation,
+            rsDomain.getBaseDN(), exportConfig != null);
+        cursor.next();
       }
-      finally
-      {
-        close(cursor);
-      }
+    }
+    finally
+    {
+      close(cursor);
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index fd57424..a3a9043 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1269,19 +1269,7 @@
   }
 
   /**
-   * Returns a set containing the serverIds that produced updates and known by
-   * this replicationServer from all over the topology, whether directly
-   * connected or connected to another RS.
-   *
-   * @return a set containing the serverIds known by this replicationServer.
-   */
-  public Set<Integer> getServerIds()
-  {
-    return domainDB.getDomainServerIds(baseDN);
-  }
-
-  /**
-   * Creates and returns a cursor.
+   * Creates and returns a cursor across this replication domain.
    * <p>
    * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
    * to the next available record.
@@ -1290,16 +1278,35 @@
    * {@link ReplicaDBCursor#close()} method to free the resources and locks used
    * by the cursor.
    *
-   * @param serverId
-   *          Identifier of the server for which the cursor is created
    * @param startAfterCSN
    *          Starting point for the cursor. If null, start from the oldest CSN
    * @return a non null {@link ReplicaDBCursor}
-   * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN)
+   * @see ReplicationDomainDB#getCursorFrom(DN, CSN)
    */
-  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
+  public ReplicaDBCursor getCursorFrom(CSN startAfterCSN)
   {
-    return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+    return domainDB.getCursorFrom(baseDN, startAfterCSN);
+  }
+
+  /**
+   * Creates and returns a cursor across this replication domain.
+   * <p>
+   * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
+   * to the next available record.
+   * <p>
+   * When the cursor is not used anymore, client code MUST call the
+   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+   * by the cursor.
+   *
+   * @param startAfterServerState
+   *          Starting point for the replicaDB cursors. If null, start from the
+   *          oldest CSN
+   * @return a non null {@link ReplicaDBCursor} going from oldest to newest CSN
+   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
+   */
+  public ReplicaDBCursor getCursorFrom(ServerState startAfterServerState)
+  {
+    return domainDB.getCursorFrom(baseDN, startAfterServerState);
   }
 
  /**
@@ -2720,7 +2727,7 @@
    */
   public void storeReceivedCTHeartbeat(CSN csn)
   {
-    // TODO:May be we can spare processing by only storing CSN (timestamp)
+    // TODO:Maybe we can spare processing by only storing CSN (timestamp)
     // instead of a server state.
     getChangeTimeHeartbeatState().update(csn);
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 8e5b22f..bc57fbe 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,6 @@
  */
 package org.opends.server.replication.server.changelog.api;
 
-import java.util.Set;
-
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -41,16 +39,6 @@
 {
 
   /**
-   * Returns the serverIds for the servers that are or have been part of the
-   * provided replication domain.
-   *
-   * @param baseDN
-   *          the replication domain baseDN
-   * @return an unmodifiable set of integers holding the serverIds
-   */
-  Set<Integer> getDomainServerIds(DN baseDN);
-
-  /**
    * Get the number of changes for the specified replication domain.
    *
    * @param baseDN
@@ -171,8 +159,9 @@
   long getCount(DN baseDN, int serverId, CSN from, CSN to);
 
   /**
-   * Generates a {@link ReplicaDBCursor} for the specified serverId and
-   * replication domain starting after the provided CSN.
+   * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
+   * specified replication domain, with all cursors starting after the provided
+   * CSN.
    * <p>
    * The cursor is already advanced to the record after startAfterCSN.
    * <p>
@@ -182,13 +171,35 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @param serverId
-   *          Identifier of the server for which the cursor is created
    * @param startAfterCSN
-   *          Starting point for the cursor. If null, start from the oldest CSN
+   *          Starting point for each ReplicaDB cursor. If null, start from the
+   *          oldest CSN for each ReplicaDB cursor.
    * @return a non null {@link ReplicaDBCursor}
+   * @see #getCursorFrom(DN, ServerState)
    */
-  ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
+  ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN);
+
+  /**
+   * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
+   * specified replication domain starting after the provided
+   * {@link ServerState} for each replicaDBs.
+   * <p>
+   * The cursor is already advanced to the records after the serverState.
+   * <p>
+   * When the cursor is not used anymore, client code MUST call the
+   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+   * by the cursor.
+   *
+   * @param baseDN
+   *          the replication domain baseDN
+   * @param startAfterServerState
+   *          Starting point for each ReplicaDB cursor. If any CSN for a
+   *          replicaDB is null, then start from the oldest CSN for this
+   *          replicaDB
+   * @return a non null {@link ReplicaDBCursor}
+   * @see #getCursorFrom(DN, CSN)
+   */
+  ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState);
 
   /**
    * for the specified serverId and replication domain.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 21d4426..de85252 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opends.messages.Message;
@@ -57,6 +54,114 @@
 {
 
   /**
+   * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
+   * replication domain, advancing from the oldest to the newest change cross
+   * all replicaDBs.
+   */
+  private final class CrossReplicaDBCursor implements ReplicaDBCursor
+  {
+
+    private UpdateMsg currentChange;
+    /**
+     * The cursors are sorted based on the current change of each cursor to
+     * consider the next change across all replicaDBs.
+     */
+    private final NavigableSet<ReplicaDBCursor> cursors =
+        new TreeSet<ReplicaDBCursor>();
+    private final DN baseDN;
+
+    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
+    {
+      this.baseDN = baseDN;
+      for (int serverId : getDomainMap(baseDN).keySet())
+      {
+        // get the last already sent CSN from that server to get a cursor
+        final CSN lastCSN = startAfterServerState.getCSN(serverId);
+        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
+      }
+    }
+
+    private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
+        CSN startAfterCSN)
+    {
+      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+      if (replicaDB != null)
+      {
+        try
+        {
+          ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
+          cursor.next();
+          return cursor;
+        }
+        catch (ChangelogException e)
+        {
+          // ignored
+        }
+      }
+      return EMPTY_CURSOR;
+    }
+
+    @Override
+    public boolean next()
+    {
+      if (cursors.isEmpty())
+      {
+        currentChange = null;
+        return false;
+      }
+
+      // To keep consistent the cursors' order in the SortedSet, it is necessary
+      // to remove and eventually add again a cursor (after moving it forward).
+      final ReplicaDBCursor cursor = cursors.pollFirst();
+      currentChange = cursor.getChange();
+      cursor.next();
+      addCursorIfNotEmpty(cursor);
+      return true;
+    }
+
+    void addCursorIfNotEmpty(ReplicaDBCursor cursor)
+    {
+      if (cursor.getChange() != null)
+      {
+        cursors.add(cursor);
+      }
+      else
+      {
+        StaticUtils.close(cursor);
+      }
+    }
+
+    @Override
+    public UpdateMsg getChange()
+    {
+      return currentChange;
+    }
+
+    @Override
+    public void close()
+    {
+      StaticUtils.close(cursors);
+    }
+
+    @Override
+    public int compareTo(ReplicaDBCursor o)
+    {
+      final CSN csn1 = getChange().getCSN();
+      final CSN csn2 = o.getChange().getCSN();
+
+      return CSN.compare(csn1, csn2);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return getClass().getSimpleName() + " baseDN=" + baseDN
+          + " currentChange=" + currentChange + " open cursors=" + cursors;
+    }
+  }
+
+  /**
    * This map contains the List of updates received from each LDAP server.
    */
   private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
@@ -109,6 +214,12 @@
     {
       // empty
     }
+
+    @Override
+    public String toString()
+    {
+      return "EmptyReplicaDBCursor";
+    }
   };
 
   /**
@@ -368,13 +479,6 @@
 
   /** {@inheritDoc} */
   @Override
-  public Set<Integer> getDomainServerIds(DN baseDN)
-  {
-    return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
-  }
-
-  /** {@inheritDoc} */
-  @Override
   public long getCount(DN baseDN, int serverId, CSN from, CSN to)
   {
     JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -565,24 +669,45 @@
 
   /** {@inheritDoc} */
   @Override
-  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
-      CSN startAfterCSN)
+  public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
   {
-    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
-    if (replicaDB != null)
+    // Builds a new serverState for all the serverIds in the replication domain
+    // to ensure we get cursors starting after the provided CSN.
+    return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ReplicaDBCursor getCursorFrom(DN baseDN,
+      ServerState startAfterServerState)
+  {
+    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+  }
+
+  private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
+  {
+    final ServerState result = new ServerState();
+    if (startAfterCSN == null)
     {
-      try
+      return result;
+    }
+
+    for (int serverId : getDomainMap(baseDN).keySet())
+    {
+      if (serverId == startAfterCSN.getServerId())
       {
-        ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
-        cursor.next();
-        return cursor;
+        // reuse the provided CSN one as it is the most accurate
+        result.update(startAfterCSN);
       }
-      catch (ChangelogException e)
+      else
       {
-        // ignored
+        // build a new CSN, ignoring the seqNum since it is irrelevant for
+        // a different serverId
+        final CSN csn = startAfterCSN; // only used for increased readability
+        result.update(new CSN(csn.getTime(), 0, serverId));
       }
     }
-    return EMPTY_CURSOR;
+    return result;
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 4cf8caa..2424239 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -584,7 +584,8 @@
   @Override
   public String toString()
   {
-    return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN;
+    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
+        + oldestCSN + " " + newestCSN;
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index f2ecd4d..03355df 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -166,4 +166,12 @@
 
     return CSN.compare(csn1, csn2);
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + " currentChange=" + currentChange + ""
+        + replicaDB;
+  }
 }

--
Gitblit v1.10.0