From a634c8d90fc2581a9486d91df07e874dda33b69e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 30 Aug 2013 10:07:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java                      |   26 ++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java                              |  153 +++++++++++++++---------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java           |   24 ++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java          |   34 +++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                     |   23 ++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java |   17 +-
 6 files changed, 138 insertions(+), 139 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 15017c9..b69eb3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
   {
     while (activeConsumer)
     {
-			UpdateMsg msg;
       if (!following)
       {
         /* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
            *           restart as usual
            *   load this change on the delayList
            */
-					NavigableSet<ReplicationDBCursor> sortedCursors = null;
+          NavigableSet<ReplicationDBCursor> sortedCursors = null;
           try
           {
             sortedCursors = collectAllCursorsWithChanges();
 
-						// fill the lateQueue
+            // fill the lateQueue
             while (!sortedCursors.isEmpty()
                 && lateQueue.count() < 100
                 && lateQueue.bytesCount() < 50000)
             {
-							lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
             }
           }
           finally
@@ -315,15 +314,15 @@
                 following = true;
               }
             }
-					}
-					else
+          }
+          else
           {
             /*
              * if the first change in the lateQueue is also on the regular
              * queue, we can resume the processing from the regular queue
              * -> set following to true and empty the lateQueue.
              */
-            msg = lateQueue.first();
+            UpdateMsg msg = lateQueue.first();
             synchronized (msgQueue)
             {
               if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
               }
             }
           }
-				}
-				else
+        }
+        else
         {
-					// get the next change from the lateQueue
+          // get the next change from the lateQueue
+          UpdateMsg msg;
           synchronized (msgQueue)
           {
             msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
           return msg;
         }
       }
+
+
       synchronized (msgQueue)
       {
         if (following)
@@ -371,8 +373,7 @@
           {
             return null;
           }
-          msg = msgQueue.removeFirst();
-
+          UpdateMsg msg = msgQueue.removeFirst();
           if (updateServerState(msg))
           {
             /*
@@ -393,38 +394,38 @@
     return null;
   }
 
-	private UpdateMsg nextOldestUpdateMsg(
-			NavigableSet<ReplicationDBCursor> sortedCursors)
-	{
-		/*
-		 * The cursors are sorted based on the currentChange of each cursor to
-		 * consider the next change across all servers.
-		 * To keep consistent the order of the cursors in the SortedSet,
-		 * it is necessary to remove and eventually add again a cursor (after moving
-		 * it forward).
-		 */
-		final ReplicationDBCursor cursor = sortedCursors.pollFirst();
-		final UpdateMsg result = cursor.getChange();
-		cursor.next();
-		addCursorIfNotEmpty(sortedCursors, cursor);
-		return result;
-	}
+  private UpdateMsg nextOldestUpdateMsg(
+      NavigableSet<ReplicationDBCursor> sortedCursors)
+  {
+    /*
+     * The cursors are sorted based on the currentChange of each cursor to
+     * consider the next change across all servers.
+     * To keep consistent the order of the cursors in the SortedSet,
+     * it is necessary to remove and eventually add again a cursor (after moving
+     * it forward).
+     */
+    final ReplicationDBCursor cursor = sortedCursors.pollFirst();
+    final UpdateMsg result = cursor.getChange();
+    cursor.next();
+    addCursorIfNotEmpty(sortedCursors, cursor);
+    return result;
+  }
 
-	private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
-			ReplicationDBCursor cursor)
-	{
-		if (cursor != null)
-		{
-			if (cursor.getChange() != null)
-			{
-				cursors.add(cursor);
-			}
-			else
-			{
-				close(cursor);
-			}
-		}
-	}
+  private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
+      ReplicationDBCursor cursor)
+  {
+    if (cursor != null)
+    {
+      if (cursor.getChange() != null)
+      {
+        cursors.add(cursor);
+      }
+      else
+      {
+        close(cursor);
+      }
+    }
+  }
 
   /**
    * Get the older Change Number for that server.
@@ -460,49 +461,49 @@
           the lateQueue when it will send the next update but we are not yet
           there. So let's take the last change not sent directly from the db.
           */
-					result = findOldestChangeNumberFromReplicationDBs();
+          result = findOldestChangeNumberFromReplicationDBs();
         }
       }
     }
     return result;
   }
 
-	private ChangeNumber findOldestChangeNumberFromReplicationDBs()
-	{
-		SortedSet<ReplicationDBCursor> sortedCursors = null;
-		try
-		{
-		  sortedCursors = collectAllCursorsWithChanges();
-			UpdateMsg msg = sortedCursors.first().getChange();
-			return msg.getChangeNumber();
-		}
-		catch (Exception e)
-		{
-			return null;
-		}
-		finally
-		{
-		  close(sortedCursors);
-		}
-	}
-
-	/**
-	 * Collects all the replication DB cursors that have changes and sort them
-	 * with the oldest {@link ChangeNumber} first.
-	 *
-	 * @return a List of cursors with changes sorted by their {@link ChangeNumber}
-	 *         (oldest first)
-	 */
-	private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+  private ChangeNumber findOldestChangeNumberFromReplicationDBs()
   {
-		final NavigableSet<ReplicationDBCursor> results =
-				new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
+    SortedSet<ReplicationDBCursor> sortedCursors = null;
+    try
+    {
+      sortedCursors = collectAllCursorsWithChanges();
+      UpdateMsg msg = sortedCursors.first().getChange();
+      return msg.getChangeNumber();
+    }
+    catch (Exception e)
+    {
+      return null;
+    }
+    finally
+    {
+      close(sortedCursors);
+    }
+  }
+
+  /**
+   * Collects all the replication DB cursors that have changes and sort them
+   * with the oldest {@link ChangeNumber} first.
+   *
+   * @return a List of cursors with changes sorted by their {@link ChangeNumber}
+   *         (oldest first)
+   */
+  private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+  {
+    final NavigableSet<ReplicationDBCursor> results =
+        new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
     for (int serverId : replicationServerDomain.getServerIds())
     {
       // get the last already sent CN from that server to get a cursor
-			final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-			addCursorIfNotEmpty(results,
-					replicationServerDomain.getCursorFrom(serverId, lastCsn));
+      final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+      addCursorIfNotEmpty(results,
+          replicationServerDomain.getCursorFrom(serverId, lastCsn));
     }
     return results;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index c88e37c..3b0b472 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -49,7 +49,6 @@
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
 import org.opends.server.replication.server.changelog.je.DbHandler;
-import org.opends.server.replication.server.changelog.je.ReplicationDB;
 import org.opends.server.types.*;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -1263,17 +1262,17 @@
   }
 
   /**
-	 * Creates and returns a cursor. When the cursor is not used anymore, the
-	 * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
-	 * resources and locks used by the cursor.
-	 * 
-	 * @param serverId
-	 *          Identifier of the server for which the cursor is created.
-	 * @param startAfterCN
-	 *          Starting point for the cursor.
-	 * @return the created {@link ReplicationDB}. Null when no DB is available or
-	 *         the DB is empty for the provided serverId .
-	 */
+   * Creates and returns a cursor. When the cursor is not used anymore, the
+   * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
+   * resources and locks used by the cursor.
+   *
+   * @param serverId
+   *          Identifier of the server for which the cursor is created.
+   * @param startAfterCN
+   *          Starting point for the cursor.
+   * @return the created {@link ReplicationDBCursor}. Null when no DB is
+   *         available or the DB is empty for the provided serverId .
+   */
   public ReplicationDBCursor getCursorFrom(int serverId,
       ChangeNumber startAfterCN)
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
index 74e69d4..2e126da 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
@@ -38,24 +38,24 @@
 {
 
   /**
-	 * Get the UpdateMsg where the cursor is currently set.
-	 * 
-	 * @return The UpdateMsg where the cursor is currently set.
-	 */
+   * Get the UpdateMsg where the cursor is currently set.
+   *
+   * @return The UpdateMsg where the cursor is currently set.
+   */
   UpdateMsg getChange();
 
   /**
-	 * Go to the next change in the ReplicationDB or in the server Queue.
-	 * 
-	 * @return false if the cursor is already on the last change before this call.
-	 */
+   * Go to the next change in the ReplicationDB or in the server Queue.
+   *
+   * @return false if the cursor is already on the last change before this call.
+   */
   boolean next();
 
   /**
-	 * Release the resources and locks used by this cursor. This method must be
-	 * called when the cursor is no longer used. Failure to do it could cause DB
-	 * deadlock.
-	 */
+   * Release the resources and locks used by this cursor. This method must be
+   * called when the cursor is no longer used. Failure to do it could cause DB
+   * deadlock.
+   */
   @Override
   void close();
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
index a0d9069..a716210 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
@@ -30,7 +30,6 @@
 import java.util.Comparator;
 
 import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.protocol.UpdateMsg;
 
 /**
  * This class defines a {@link Comparator} that allows to know which
@@ -41,14 +40,14 @@
               implements Comparator<ReplicationDBCursor>
 {
   /**
-	 * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
-	 * 
-	 * @param o1
-	 *          first cursor.
-	 * @param o2
-	 *          second cursor.
-	 * @return result of the comparison.
-	 */
+   * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
+   *
+   * @param o1
+   *          first cursor.
+   * @param o2
+   *          second cursor.
+   * @return result of the comparison.
+   */
   @Override
   public int compare(ReplicationDBCursor o1, ReplicationDBCursor o2)
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index e4f4dd8..ea2991b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,7 +44,7 @@
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.InitializationException;
@@ -260,18 +260,18 @@
   }
 
   /**
-	 * Generate a new {@link ReplicationDBCursor} that allows to browse the db
-	 * managed by this dbHandler and starting at the position defined by a given
-	 * changeNumber.
-	 *
-	 * @param startAfterCN
-	 *          The position where the cursor must start.
-	 * @return a new {@link ReplicationDBCursor} that allows to browse the db
-	 *         managed by this dbHandler and starting at the position defined by a
-	 *         given changeNumber.
-	 * @throws ChangelogException
-	 *           if a database problem happened.
-	 */
+   * Generate a new {@link ReplicationDBCursor} that allows to browse the db
+   * managed by this dbHandler and starting at the position defined by a given
+   * changeNumber.
+   *
+   * @param startAfterCN
+   *          The position where the cursor must start.
+   * @return a new {@link ReplicationDBCursor} that allows to browse the db
+   *         managed by this dbHandler and starting at the position defined by a
+   *         given changeNumber.
+   * @throws ChangelogException
+   *           if a database problem happened.
+   */
   public ReplicationDBCursor generateCursorFrom(ChangeNumber startAfterCN)
       throws ChangelogException
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
index a096631..7f2c7af 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
@@ -32,7 +32,7 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
 
 /**
  * Berkeley DB JE implementation of {@link ReplicationDBCursor}.
@@ -46,18 +46,18 @@
   private ChangeNumber lastNonNullCurrentCN;
 
   /**
-	 * Creates a new JEReplicationDBCursor. All created cursor must be released by
-	 * the caller using the {@link #close()} method.
-	 *
-	 * @param db
-	 *          The db where the cursor must be created.
-	 * @param startAfterCN
-	 *          The ChangeNumber after which the cursor must start.
-	 * @param dbHandler
-	 *          The associated DbHandler.
-	 * @throws ChangelogException
-	 *           if a database problem happened.
-	 */
+   * Creates a new JEReplicationDBCursor. All created cursor must be released by
+   * the caller using the {@link #close()} method.
+   *
+   * @param db
+   *          The db where the cursor must be created.
+   * @param startAfterCN
+   *          The ChangeNumber after which the cursor must start.
+   * @param dbHandler
+   *          The associated DbHandler.
+   * @throws ChangelogException
+   *           if a database problem happened.
+   */
   public JEReplicationDBCursor(ReplicationDB db, ChangeNumber startAfterCN,
       DbHandler dbHandler) throws ChangelogException
   {
@@ -151,10 +151,10 @@
   }
 
   /**
-	 * Called by the Gc when the object is garbage collected Release the internal
-	 * cursor in case the cursor was badly used and {@link #close()} was never
-	 * called.
-	 */
+   * Called by the Gc when the object is garbage collected Release the internal
+   * cursor in case the cursor was badly used and {@link #close()} was never
+   * called.
+   */
   @Override
   protected void finalize()
   {

--
Gitblit v1.10.0