From a634c8d90fc2581a9486d91df07e874dda33b69e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 30 Aug 2013 10:07:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java |  153 +++++++++++++++++++++++++-------------------------
 1 files changed, 77 insertions(+), 76 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 15017c9..b69eb3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
   {
     while (activeConsumer)
     {
-			UpdateMsg msg;
       if (!following)
       {
         /* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
            *           restart as usual
            *   load this change on the delayList
            */
-					NavigableSet<ReplicationDBCursor> sortedCursors = null;
+          NavigableSet<ReplicationDBCursor> sortedCursors = null;
           try
           {
             sortedCursors = collectAllCursorsWithChanges();
 
-						// fill the lateQueue
+            // fill the lateQueue
             while (!sortedCursors.isEmpty()
                 && lateQueue.count() < 100
                 && lateQueue.bytesCount() < 50000)
             {
-							lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
             }
           }
           finally
@@ -315,15 +314,15 @@
                 following = true;
               }
             }
-					}
-					else
+          }
+          else
           {
             /*
              * if the first change in the lateQueue is also on the regular
              * queue, we can resume the processing from the regular queue
              * -> set following to true and empty the lateQueue.
              */
-            msg = lateQueue.first();
+            UpdateMsg msg = lateQueue.first();
             synchronized (msgQueue)
             {
               if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
               }
             }
           }
-				}
-				else
+        }
+        else
         {
-					// get the next change from the lateQueue
+          // get the next change from the lateQueue
+          UpdateMsg msg;
           synchronized (msgQueue)
           {
             msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
           return msg;
         }
       }
+
+
       synchronized (msgQueue)
       {
         if (following)
@@ -371,8 +373,7 @@
           {
             return null;
           }
-          msg = msgQueue.removeFirst();
-
+          UpdateMsg msg = msgQueue.removeFirst();
           if (updateServerState(msg))
           {
             /*
@@ -393,38 +394,38 @@
     return null;
   }
 
-	private UpdateMsg nextOldestUpdateMsg(
-			NavigableSet<ReplicationDBCursor> sortedCursors)
-	{
-		/*
-		 * The cursors are sorted based on the currentChange of each cursor to
-		 * consider the next change across all servers.
-		 * To keep consistent the order of the cursors in the SortedSet,
-		 * it is necessary to remove and eventually add again a cursor (after moving
-		 * it forward).
-		 */
-		final ReplicationDBCursor cursor = sortedCursors.pollFirst();
-		final UpdateMsg result = cursor.getChange();
-		cursor.next();
-		addCursorIfNotEmpty(sortedCursors, cursor);
-		return result;
-	}
+  private UpdateMsg nextOldestUpdateMsg(
+      NavigableSet<ReplicationDBCursor> sortedCursors)
+  {
+    /*
+     * The cursors are sorted based on the currentChange of each cursor to
+     * consider the next change across all servers.
+     * To keep consistent the order of the cursors in the SortedSet,
+     * it is necessary to remove and eventually add again a cursor (after moving
+     * it forward).
+     */
+    final ReplicationDBCursor cursor = sortedCursors.pollFirst();
+    final UpdateMsg result = cursor.getChange();
+    cursor.next();
+    addCursorIfNotEmpty(sortedCursors, cursor);
+    return result;
+  }
 
-	private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
-			ReplicationDBCursor cursor)
-	{
-		if (cursor != null)
-		{
-			if (cursor.getChange() != null)
-			{
-				cursors.add(cursor);
-			}
-			else
-			{
-				close(cursor);
-			}
-		}
-	}
+  private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
+      ReplicationDBCursor cursor)
+  {
+    if (cursor != null)
+    {
+      if (cursor.getChange() != null)
+      {
+        cursors.add(cursor);
+      }
+      else
+      {
+        close(cursor);
+      }
+    }
+  }
 
   /**
    * Get the older Change Number for that server.
@@ -460,49 +461,49 @@
           the lateQueue when it will send the next update but we are not yet
           there. So let's take the last change not sent directly from the db.
           */
-					result = findOldestChangeNumberFromReplicationDBs();
+          result = findOldestChangeNumberFromReplicationDBs();
         }
       }
     }
     return result;
   }
 
-	private ChangeNumber findOldestChangeNumberFromReplicationDBs()
-	{
-		SortedSet<ReplicationDBCursor> sortedCursors = null;
-		try
-		{
-		  sortedCursors = collectAllCursorsWithChanges();
-			UpdateMsg msg = sortedCursors.first().getChange();
-			return msg.getChangeNumber();
-		}
-		catch (Exception e)
-		{
-			return null;
-		}
-		finally
-		{
-		  close(sortedCursors);
-		}
-	}
-
-	/**
-	 * Collects all the replication DB cursors that have changes and sort them
-	 * with the oldest {@link ChangeNumber} first.
-	 *
-	 * @return a List of cursors with changes sorted by their {@link ChangeNumber}
-	 *         (oldest first)
-	 */
-	private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+  private ChangeNumber findOldestChangeNumberFromReplicationDBs()
   {
-		final NavigableSet<ReplicationDBCursor> results =
-				new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
+    SortedSet<ReplicationDBCursor> sortedCursors = null;
+    try
+    {
+      sortedCursors = collectAllCursorsWithChanges();
+      UpdateMsg msg = sortedCursors.first().getChange();
+      return msg.getChangeNumber();
+    }
+    catch (Exception e)
+    {
+      return null;
+    }
+    finally
+    {
+      close(sortedCursors);
+    }
+  }
+
+  /**
+   * Collects all the replication DB cursors that have changes and sort them
+   * with the oldest {@link ChangeNumber} first.
+   *
+   * @return a List of cursors with changes sorted by their {@link ChangeNumber}
+   *         (oldest first)
+   */
+  private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+  {
+    final NavigableSet<ReplicationDBCursor> results =
+        new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
     for (int serverId : replicationServerDomain.getServerIds())
     {
       // get the last already sent CN from that server to get a cursor
-			final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-			addCursorIfNotEmpty(results,
-					replicationServerDomain.getCursorFrom(serverId, lastCsn));
+      final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+      addCursorIfNotEmpty(results,
+          replicationServerDomain.getCursorFrom(serverId, lastCsn));
     }
     return results;
   }

--
Gitblit v1.10.0