From bd3c137fd2e1fa9e13289ab0573e07f9a4212e05 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 30 Aug 2013 09:56:47 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/MessageHandler.java |  161 +++++++++++++++++++++++++++--------------------------
 1 files changed, 82 insertions(+), 79 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 739c502..15017c9 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,10 +27,7 @@
  */
 package org.opends.server.replication.server;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
@@ -253,9 +250,9 @@
    */
   protected UpdateMsg getNextMessage(boolean synchronous)
   {
-    UpdateMsg msg;
     while (activeConsumer)
     {
+			UpdateMsg msg;
       if (!following)
       {
         /* this server is late with regard to some other masters
@@ -285,32 +282,22 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          SortedSet<ReplicationIterator> iteratorSortedSet = null;
+					NavigableSet<ReplicationDBCursor> sortedCursors = null;
           try
           {
-            iteratorSortedSet = collectAllIteratorsWithChanges();
+            sortedCursors = collectAllCursorsWithChanges();
 
-            /* fill the lateQueue */
-            // The loop below relies on the fact that it is sorted based
-            // on the currentChange of each iterator to consider the next
-            // change across all servers.
-            //
-            // Hence it is necessary to remove and eventual add again an
-            // iterator when looping in order to keep consistent the order of
-            // the iterators (see ReplicationIteratorComparator.
-            while (!iteratorSortedSet.isEmpty()
-                && (lateQueue.count() < 100)
-                && (lateQueue.bytesCount() < 50000))
+						// fill the lateQueue
+            while (!sortedCursors.isEmpty()
+                && lateQueue.count() < 100
+                && lateQueue.bytesCount() < 50000)
             {
-              ReplicationIterator iterator = iteratorSortedSet.first();
-              iteratorSortedSet.remove(iterator);
-              lateQueue.add(iterator.getChange());
-              addIteratorIfNotEmpty(iteratorSortedSet, iterator);
+							lateQueue.add(nextOldestUpdateMsg(sortedCursors));
             }
           }
           finally
           {
-            close(iteratorSortedSet);
+            close(sortedCursors);
           }
 
           /*
@@ -328,7 +315,8 @@
                 following = true;
               }
             }
-          } else
+					}
+					else
           {
             /*
              * if the first change in the lateQueue is also on the regular
@@ -353,9 +341,10 @@
               }
             }
           }
-        } else
+				}
+				else
         {
-          /* get the next change from the lateQueue */
+					// get the next change from the lateQueue
           synchronized (msgQueue)
           {
             msg = lateQueue.removeFirst();
@@ -404,18 +393,38 @@
     return null;
   }
 
-  private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
-      ReplicationIterator iter)
-  {
-    if (iter.next())
-    {
-      iterators.add(iter);
-    }
-    else
-    {
-      close(iter);
-    }
-  }
+	private UpdateMsg nextOldestUpdateMsg(
+			NavigableSet<ReplicationDBCursor> sortedCursors)
+	{
+		/*
+		 * The cursors are sorted based on the currentChange of each cursor to
+		 * consider the next change across all servers.
+		 * To keep consistent the order of the cursors in the SortedSet,
+		 * it is necessary to remove and eventually add again a cursor (after moving
+		 * it forward).
+		 */
+		final ReplicationDBCursor cursor = sortedCursors.pollFirst();
+		final UpdateMsg result = cursor.getChange();
+		cursor.next();
+		addCursorIfNotEmpty(sortedCursors, cursor);
+		return result;
+	}
+
+	private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
+			ReplicationDBCursor cursor)
+	{
+		if (cursor != null)
+		{
+			if (cursor.getChange() != null)
+			{
+				cursors.add(cursor);
+			}
+			else
+			{
+				close(cursor);
+			}
+		}
+	}
 
   /**
    * Get the older Change Number for that server.
@@ -449,57 +458,51 @@
           We may be at the very moment when the writer has emptied the
           lateQueue when it sent the last update. The writer will fill again
           the lateQueue when it will send the next update but we are not yet
-          there. So let's take the last change not sent directly from
-          the db.
+          there. So let's take the last change not sent directly from the db.
           */
-          SortedSet<ReplicationIterator> iteratorSortedSet = null;
-          try
-          {
-            iteratorSortedSet = collectAllIteratorsWithChanges();
-            UpdateMsg msg = iteratorSortedSet.first().getChange();
-            result = msg.getChangeNumber();
-          } catch (Exception e)
-          {
-            result = null;
-          } finally
-          {
-            close(iteratorSortedSet);
-          }
+					result = findOldestChangeNumberFromReplicationDBs();
         }
       }
     }
     return result;
   }
 
-  private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
-  {
-    SortedSet<ReplicationIterator> results =
-        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
+	private ChangeNumber findOldestChangeNumberFromReplicationDBs()
+	{
+		SortedSet<ReplicationDBCursor> sortedCursors = null;
+		try
+		{
+		  sortedCursors = collectAllCursorsWithChanges();
+			UpdateMsg msg = sortedCursors.first().getChange();
+			return msg.getChangeNumber();
+		}
+		catch (Exception e)
+		{
+			return null;
+		}
+		finally
+		{
+		  close(sortedCursors);
+		}
+	}
 
-    // Build a list of candidates iterator (i.e. db i.e. server)
+	/**
+	 * Collects all the replication DB cursors that have changes and sort them
+	 * with the oldest {@link ChangeNumber} first.
+	 *
+	 * @return a List of cursors with changes sorted by their {@link ChangeNumber}
+	 *         (oldest first)
+	 */
+	private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+  {
+		final NavigableSet<ReplicationDBCursor> results =
+				new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
     for (int serverId : replicationServerDomain.getServerIds())
     {
-      // get the last already sent CN from that server
-      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-      // get an iterator in this server db from that last change
-      ReplicationIterator iter =
-        replicationServerDomain.getChangelogIterator(serverId, lastCsn);
-      /*
-      if that iterator has changes, then it is a candidate
-      it is added in the sorted list at a position given by its
-      current change (see ReplicationIteratorComparator).
-      */
-      if (iter != null)
-      {
-        if (iter.getChange() != null)
-        {
-          results.add(iter);
-        }
-        else
-        {
-          close(iter);
-        }
-      }
+      // get the last already sent CN from that server to get a cursor
+			final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+			addCursorIfNotEmpty(results,
+					replicationServerDomain.getCursorFrom(serverId, lastCsn));
     }
     return results;
   }

--
Gitblit v1.10.0