From 4936231f6b43a59233dc4ee909d9a2eeb3ced31a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 13:52:43 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/MessageHandler.java |   76 +++++++-------------------------------
 1 files changed, 14 insertions(+), 62 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed243c4..07f2371 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@
  */
 package org.opends.server.replication.server;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
@@ -294,22 +295,19 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          NavigableSet<ReplicaDBCursor> sortedCursors = null;
+          ReplicaDBCursor cursor = null;
           try
           {
-            sortedCursors = collectAllCursorsWithChanges();
-
             // fill the lateQueue
-            while (!sortedCursors.isEmpty()
-                && lateQueue.count() < 100
-                && lateQueue.bytesCount() < 50000)
+            cursor = replicationServerDomain.getCursorFrom(serverState);
+            while (cursor.next() && isLateQueueBelowThreshold())
             {
-              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+              lateQueue.add(cursor.getChange());
             }
           }
           finally
           {
-            close(sortedCursors);
+            close(cursor);
           }
 
           /*
@@ -403,34 +401,9 @@
     return null;
   }
 
-  private UpdateMsg nextOldestUpdateMsg(
-      NavigableSet<ReplicaDBCursor> sortedCursors)
+  private boolean isLateQueueBelowThreshold()
   {
-    /*
-     * The cursors are sorted based on the currentChange of each cursor to
-     * consider the next change across all servers.
-     * To keep consistent the order of the cursors in the SortedSet,
-     * it is necessary to remove and eventually add again a cursor (after moving
-     * it forward).
-     */
-    final ReplicaDBCursor cursor = sortedCursors.pollFirst();
-    final UpdateMsg result = cursor.getChange();
-    cursor.next();
-    addCursorIfNotEmpty(sortedCursors, cursor);
-    return result;
-  }
-
-  private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
-      ReplicaDBCursor cursor)
-  {
-    if (cursor.getChange() != null)
-    {
-      cursors.add(cursor);
-    }
-    else
-    {
-      close(cursor);
-    }
+    return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
   }
 
   /**
@@ -476,12 +449,12 @@
 
   private CSN findOldestCSNFromReplicaDBs()
   {
-    SortedSet<ReplicaDBCursor> sortedCursors = null;
+    ReplicaDBCursor cursor = null;
     try
     {
-      sortedCursors = collectAllCursorsWithChanges();
-      UpdateMsg msg = sortedCursors.first().getChange();
-      return msg.getCSN();
+      cursor = replicationServerDomain.getCursorFrom(serverState);
+      cursor.next();
+      return cursor.getChange().getCSN();
     }
     catch (Exception e)
     {
@@ -489,32 +462,11 @@
     }
     finally
     {
-      close(sortedCursors);
+      close(cursor);
     }
   }
 
   /**
-   * Collects all the {@link ReplicaDBCursor}s that have changes and sort them
-   * with the oldest {@link CSN} first.
-   *
-   * @return a List of cursors with changes sorted by their {@link CSN}
-   *         (oldest first)
-   */
-  private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
-  {
-    final NavigableSet<ReplicaDBCursor> results =
-        new TreeSet<ReplicaDBCursor>();
-    for (int serverId : replicationServerDomain.getServerIds())
-    {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCsn = serverState.getCSN(serverId);
-      addCursorIfNotEmpty(results,
-          replicationServerDomain.getCursorFrom(serverId, lastCsn));
-    }
-    return results;
-  }
-
-  /**
    * Get the count of updates sent to this server.
    * @return  The count of update sent to this server.
    */

--
Gitblit v1.10.0