From d24edab25bfe93a3e3ea4eb38fb9860cef7aa034 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 13 Nov 2013 14:06:28 +0000
Subject: [PATCH] OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB 

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java     |  128 ++++---------------------
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java |  144 ++++++++++++++++++++++++++++
 2 files changed, 167 insertions(+), 105 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
new file mode 100644
index 0000000..75eb518
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -0,0 +1,144 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.*;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.util.StaticUtils;
+
+/**
+ * {@link DBCursor} implementation that iterates across a Collection of
+ * {@link DBCursor}s, advancing from the oldest to the newest change cross all
+ * cursors.
+ */
+final class CompositeDBCursor implements DBCursor<UpdateMsg>
+{
+
+  private UpdateMsg currentChange;
+  private final List<DBCursor<UpdateMsg>> exhaustedCursors =
+      new ArrayList<DBCursor<UpdateMsg>>();
+  /**
+   * The cursors are sorted based on the current change of each cursor to
+   * consider the next change across all available cursors.
+   */
+  private final NavigableSet<DBCursor<UpdateMsg>> cursors =
+      new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
+      {
+        @Override
+        public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
+        {
+          final CSN csn1 = o1.getRecord().getCSN();
+          final CSN csn2 = o2.getRecord().getCSN();
+          return CSN.compare(csn1, csn2);
+        }
+      });
+
+  /**
+   * Builds a CompositeDBCursor using the provided collection of cursors.
+   *
+   * @param cursors
+   *          the cursors that will be iterated upon.
+   */
+  public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
+  {
+    for (DBCursor<UpdateMsg> cursor : cursors)
+    {
+      add(cursor);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next() throws ChangelogException
+  {
+    // try to recycle empty cursors in case the underlying ReplicaDBs received
+    // new changes
+    for (Iterator<DBCursor<UpdateMsg>> iter = exhaustedCursors.iterator(); iter
+        .hasNext();)
+    {
+      DBCursor<UpdateMsg> cursor = iter.next();
+      iter.remove();
+      cursor.next();
+      add(cursor);
+    }
+
+    if (cursors.isEmpty())
+    {
+      // no cursors are left with changes.
+      currentChange = null;
+      return false;
+    }
+
+    // To keep consistent the cursors' order in the SortedSet, it is necessary
+    // to remove and eventually add again a cursor (after moving it forward).
+    final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
+    currentChange = cursor.getRecord();
+    cursor.next();
+    add(cursor);
+    return true;
+  }
+
+  private void add(DBCursor<UpdateMsg> cursor)
+  {
+    if (cursor.getRecord() != null)
+    {
+      this.cursors.add(cursor);
+    }
+    else
+    {
+      this.exhaustedCursors.add(cursor);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public UpdateMsg getRecord()
+  {
+    return currentChange;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    StaticUtils.close(cursors);
+    StaticUtils.close(exhaustedCursors);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + " currentChange=" + currentChange
+        + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 30bdeff..fdc8f3e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@
   protected static final DebugTracer TRACER = getTracer();
 
   /**
-   * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
-   * a replication domain, advancing from the oldest to the newest change cross
-   * all replicaDBs.
-   */
-  private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
-  {
-
-    private final DN baseDN;
-    private UpdateMsg currentChange;
-    /**
-     * The cursors are sorted based on the current change of each cursor to
-     * consider the next change across all replicaDBs.
-     */
-    private final NavigableSet<DBCursor<UpdateMsg>> cursors =
-        new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
-        {
-
-          @Override
-          public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
-          {
-            final CSN csn1 = o1.getRecord().getCSN();
-            final CSN csn2 = o2.getRecord().getCSN();
-            return CSN.compare(csn1, csn2);
-          }
-        });
-
-    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
-        throws ChangelogException
-    {
-      this.baseDN = baseDN;
-      for (int serverId : getDomainMap(baseDN).keySet())
-      {
-        // get the last already sent CSN from that server to get a cursor
-        final CSN lastCSN = startAfterServerState.getCSN(serverId);
-        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
-      }
-    }
-
-    private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
-        CSN startAfterCSN) throws ChangelogException
-    {
-      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
-      if (replicaDB != null)
-      {
-        DBCursor<UpdateMsg> cursor =
-            replicaDB.generateCursorFrom(startAfterCSN);
-        cursor.next();
-        return cursor;
-      }
-      return EMPTY_CURSOR;
-    }
-
-    @Override
-    public boolean next() throws ChangelogException
-    {
-      if (cursors.isEmpty())
-      {
-        currentChange = null;
-        return false;
-      }
-
-      // To keep consistent the cursors' order in the SortedSet, it is necessary
-      // to remove and eventually add again a cursor (after moving it forward).
-      final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
-      currentChange = cursor.getRecord();
-      cursor.next();
-      addCursorIfNotEmpty(cursor);
-      return true;
-    }
-
-    void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
-    {
-      if (cursor.getRecord() != null)
-      {
-        cursors.add(cursor);
-      }
-      else
-      {
-        StaticUtils.close(cursor);
-      }
-    }
-
-    @Override
-    public UpdateMsg getRecord()
-    {
-      return currentChange;
-    }
-
-    @Override
-    public void close()
-    {
-      StaticUtils.close(cursors);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString()
-    {
-      return getClass().getSimpleName() + " baseDN=" + baseDN
-          + " currentChange=" + currentChange + " open cursors=" + cursors;
-    }
-  }
-
-  /**
    * This map contains the List of updates received from each LDAP server.
    * <p>
    * When removing a domainMap, code:
@@ -790,7 +686,29 @@
   public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
       ServerState startAfterServerState) throws ChangelogException
   {
-    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+    final List<DBCursor<UpdateMsg>> cursors =
+        new ArrayList<DBCursor<UpdateMsg>>(serverIds.size());
+    for (int serverId : serverIds)
+    {
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterServerState.getCSN(serverId);
+      cursors.add(getCursorFrom(baseDN, serverId, lastCSN));
+    }
+    return new CompositeDBCursor(cursors);
+  }
+
+  private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
+      CSN startAfterCSN) throws ChangelogException
+  {
+    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+    if (replicaDB != null)
+    {
+      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
+      cursor.next();
+      return cursor;
+    }
+    return EMPTY_CURSOR;
   }
 
   private ServerState buildServerState(DN baseDN, CSN startAfterCSN)

--
Gitblit v1.10.0