From d24edab25bfe93a3e3ea4eb38fb9860cef7aa034 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 13 Nov 2013 14:06:28 +0000
Subject: [PATCH] OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB 

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |  128 +++++++-----------------------------------
 1 files changed, 23 insertions(+), 105 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 30bdeff..fdc8f3e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@
   protected static final DebugTracer TRACER = getTracer();
 
   /**
-   * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
-   * a replication domain, advancing from the oldest to the newest change cross
-   * all replicaDBs.
-   */
-  private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
-  {
-
-    private final DN baseDN;
-    private UpdateMsg currentChange;
-    /**
-     * The cursors are sorted based on the current change of each cursor to
-     * consider the next change across all replicaDBs.
-     */
-    private final NavigableSet<DBCursor<UpdateMsg>> cursors =
-        new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
-        {
-
-          @Override
-          public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
-          {
-            final CSN csn1 = o1.getRecord().getCSN();
-            final CSN csn2 = o2.getRecord().getCSN();
-            return CSN.compare(csn1, csn2);
-          }
-        });
-
-    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
-        throws ChangelogException
-    {
-      this.baseDN = baseDN;
-      for (int serverId : getDomainMap(baseDN).keySet())
-      {
-        // get the last already sent CSN from that server to get a cursor
-        final CSN lastCSN = startAfterServerState.getCSN(serverId);
-        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
-      }
-    }
-
-    private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
-        CSN startAfterCSN) throws ChangelogException
-    {
-      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
-      if (replicaDB != null)
-      {
-        DBCursor<UpdateMsg> cursor =
-            replicaDB.generateCursorFrom(startAfterCSN);
-        cursor.next();
-        return cursor;
-      }
-      return EMPTY_CURSOR;
-    }
-
-    @Override
-    public boolean next() throws ChangelogException
-    {
-      if (cursors.isEmpty())
-      {
-        currentChange = null;
-        return false;
-      }
-
-      // To keep consistent the cursors' order in the SortedSet, it is necessary
-      // to remove and eventually add again a cursor (after moving it forward).
-      final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
-      currentChange = cursor.getRecord();
-      cursor.next();
-      addCursorIfNotEmpty(cursor);
-      return true;
-    }
-
-    void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
-    {
-      if (cursor.getRecord() != null)
-      {
-        cursors.add(cursor);
-      }
-      else
-      {
-        StaticUtils.close(cursor);
-      }
-    }
-
-    @Override
-    public UpdateMsg getRecord()
-    {
-      return currentChange;
-    }
-
-    @Override
-    public void close()
-    {
-      StaticUtils.close(cursors);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString()
-    {
-      return getClass().getSimpleName() + " baseDN=" + baseDN
-          + " currentChange=" + currentChange + " open cursors=" + cursors;
-    }
-  }
-
-  /**
    * This map contains the List of updates received from each LDAP server.
    * <p>
    * When removing a domainMap, code:
@@ -790,7 +686,29 @@
   public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
       ServerState startAfterServerState) throws ChangelogException
   {
-    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+    final List<DBCursor<UpdateMsg>> cursors =
+        new ArrayList<DBCursor<UpdateMsg>>(serverIds.size());
+    for (int serverId : serverIds)
+    {
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterServerState.getCSN(serverId);
+      cursors.add(getCursorFrom(baseDN, serverId, lastCSN));
+    }
+    return new CompositeDBCursor(cursors);
+  }
+
+  private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
+      CSN startAfterCSN) throws ChangelogException
+  {
+    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+    if (replicaDB != null)
+    {
+      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
+      cursor.next();
+      return cursor;
+    }
+    return EMPTY_CURSOR;
   }
 
   private ServerState buildServerState(DN baseDN, CSN startAfterCSN)

--
Gitblit v1.10.0