From d454b5f9a2b7dc4ef2a70cd983a26436568cbe04 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 13:52:43 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |  169 +++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 147 insertions(+), 22 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 21d4426..de85252 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opends.messages.Message;
@@ -57,6 +54,114 @@
 {
 
   /**
+   * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
+   * replication domain, advancing from the oldest to the newest change cross
+   * all replicaDBs.
+   */
+  private final class CrossReplicaDBCursor implements ReplicaDBCursor
+  {
+
+    private UpdateMsg currentChange;
+    /**
+     * The cursors are sorted based on the current change of each cursor to
+     * consider the next change across all replicaDBs.
+     */
+    private final NavigableSet<ReplicaDBCursor> cursors =
+        new TreeSet<ReplicaDBCursor>();
+    private final DN baseDN;
+
+    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
+    {
+      this.baseDN = baseDN;
+      for (int serverId : getDomainMap(baseDN).keySet())
+      {
+        // get the last already sent CSN from that server to get a cursor
+        final CSN lastCSN = startAfterServerState.getCSN(serverId);
+        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
+      }
+    }
+
+    private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
+        CSN startAfterCSN)
+    {
+      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+      if (replicaDB != null)
+      {
+        try
+        {
+          ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
+          cursor.next();
+          return cursor;
+        }
+        catch (ChangelogException e)
+        {
+          // ignored
+        }
+      }
+      return EMPTY_CURSOR;
+    }
+
+    @Override
+    public boolean next()
+    {
+      if (cursors.isEmpty())
+      {
+        currentChange = null;
+        return false;
+      }
+
+      // To keep consistent the cursors' order in the SortedSet, it is necessary
+      // to remove and eventually add again a cursor (after moving it forward).
+      final ReplicaDBCursor cursor = cursors.pollFirst();
+      currentChange = cursor.getChange();
+      cursor.next();
+      addCursorIfNotEmpty(cursor);
+      return true;
+    }
+
+    void addCursorIfNotEmpty(ReplicaDBCursor cursor)
+    {
+      if (cursor.getChange() != null)
+      {
+        cursors.add(cursor);
+      }
+      else
+      {
+        StaticUtils.close(cursor);
+      }
+    }
+
+    @Override
+    public UpdateMsg getChange()
+    {
+      return currentChange;
+    }
+
+    @Override
+    public void close()
+    {
+      StaticUtils.close(cursors);
+    }
+
+    @Override
+    public int compareTo(ReplicaDBCursor o)
+    {
+      final CSN csn1 = getChange().getCSN();
+      final CSN csn2 = o.getChange().getCSN();
+
+      return CSN.compare(csn1, csn2);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return getClass().getSimpleName() + " baseDN=" + baseDN
+          + " currentChange=" + currentChange + " open cursors=" + cursors;
+    }
+  }
+
+  /**
    * This map contains the List of updates received from each LDAP server.
    */
   private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
@@ -109,6 +214,12 @@
     {
       // empty
     }
+
+    @Override
+    public String toString()
+    {
+      return "EmptyReplicaDBCursor";
+    }
   };
 
   /**
@@ -368,13 +479,6 @@
 
   /** {@inheritDoc} */
   @Override
-  public Set<Integer> getDomainServerIds(DN baseDN)
-  {
-    return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
-  }
-
-  /** {@inheritDoc} */
-  @Override
   public long getCount(DN baseDN, int serverId, CSN from, CSN to)
   {
     JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -565,24 +669,45 @@
 
   /** {@inheritDoc} */
   @Override
-  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
-      CSN startAfterCSN)
+  public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
   {
-    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
-    if (replicaDB != null)
+    // Builds a new serverState for all the serverIds in the replication domain
+    // to ensure we get cursors starting after the provided CSN.
+    return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ReplicaDBCursor getCursorFrom(DN baseDN,
+      ServerState startAfterServerState)
+  {
+    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+  }
+
+  private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
+  {
+    final ServerState result = new ServerState();
+    if (startAfterCSN == null)
     {
-      try
+      return result;
+    }
+
+    for (int serverId : getDomainMap(baseDN).keySet())
+    {
+      if (serverId == startAfterCSN.getServerId())
       {
-        ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
-        cursor.next();
-        return cursor;
+        // reuse the provided CSN one as it is the most accurate
+        result.update(startAfterCSN);
       }
-      catch (ChangelogException e)
+      else
       {
-        // ignored
+        // build a new CSN, ignoring the seqNum since it is irrelevant for
+        // a different serverId
+        final CSN csn = startAfterCSN; // only used for increased readability
+        result.update(new CSN(csn.getTime(), 0, serverId));
       }
     }
-    return EMPTY_CURSOR;
+    return result;
   }
 
   /** {@inheritDoc} */

--
Gitblit v1.10.0