From 0a51f5fbeb5e99c52f1be8973ae656de34fab75f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 09:30:53 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/MessageHandler.java |  143 +++++++++++++++++++++++------------------------
 1 files changed, 71 insertions(+), 72 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed652c7..f55298c 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@
            *           unlock memory tree
            *           restart as usual
            *   load this change on the delayList
-           *
            */
-          SortedSet<ReplicationIterator> iteratorSortedSet =
-              new TreeSet<ReplicationIterator>(
-                  new ReplicationIteratorComparator());
+          SortedSet<ReplicationIterator> iteratorSortedSet = null;
           try
           {
-            /* fill the lateQueue */
-            for (int serverId : replicationServerDomain.getServers())
-            {
-              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-              ReplicationIterator iterator = replicationServerDomain
-                  .getChangelogIterator(serverId, lastCsn);
-              if (iterator != null)
-              {
-                if (iterator.getChange() != null)
-                {
-                  iteratorSortedSet.add(iterator);
-                }
-                else
-                {
-                  iterator.releaseCursor();
-                }
-              }
-            }
+            iteratorSortedSet = collectAllIteratorsWithChanges();
 
+            /* fill the lateQueue */
             // The loop below relies on the fact that it is sorted based
             // on the currentChange of each iterator to consider the next
             // change across all servers.
@@ -320,22 +301,12 @@
               ReplicationIterator iterator = iteratorSortedSet.first();
               iteratorSortedSet.remove(iterator);
               lateQueue.add(iterator.getChange());
-              if (iterator.next())
-              {
-                iteratorSortedSet.add(iterator);
-              }
-              else
-              {
-                iterator.releaseCursor();
-              }
+              addIteratorIfNotEmpty(iteratorSortedSet, iterator);
             }
           }
           finally
           {
-            for (ReplicationIterator iterator : iteratorSortedSet)
-            {
-              iterator.releaseCursor();
-            }
+            releaseAllIterators(iteratorSortedSet);
           }
 
           /*
@@ -343,7 +314,6 @@
            * messages in the replication log so the remote serevr is not
            * late anymore.
            */
-
           if (lateQueue.isEmpty())
           {
             synchronized (msgQueue)
@@ -430,6 +400,19 @@
     return null;
   }
 
+  private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
+      ReplicationIterator iter)
+  {
+    if (iter.next())
+    {
+      iterators.add(iter);
+    }
+    else
+    {
+      iter.releaseCursor();
+    }
+  }
+
   /**
    * Get the older Change Number for that server.
    * Returns null when the queue is empty.
@@ -450,7 +433,12 @@
       }
       else
       {
-        if (lateQueue.isEmpty())
+        if (!lateQueue.isEmpty())
+        {
+          UpdateMsg msg = lateQueue.first();
+          result = msg.getChangeNumber();
+        }
+        else
         {
           /*
           following is false AND lateQueue is empty
@@ -460,36 +448,10 @@
           there. So let's take the last change not sent directly from
           the db.
           */
-          SortedSet<ReplicationIterator> iteratorSortedSet =
-              new TreeSet<ReplicationIterator>(
-                  new ReplicationIteratorComparator());
+          SortedSet<ReplicationIterator> iteratorSortedSet = null;
           try
           {
-            // Build a list of candidates iterator (i.e. db i.e. server)
-            for (int serverId : replicationServerDomain.getServers())
-            {
-              // get the last already sent CN from that server
-              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-              // get an iterator in this server db from that last change
-              ReplicationIterator iterator =
-                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
-              /*
-              if that iterator has changes, then it is a candidate
-              it is added in the sorted list at a position given by its
-              current change (see ReplicationIteratorComparator).
-              */
-              if (iterator != null)
-              {
-                if (iterator.getChange() != null)
-                {
-                  iteratorSortedSet.add(iterator);
-                }
-                else
-                {
-                  iterator.releaseCursor();
-                }
-              }
-            }
+            iteratorSortedSet = collectAllIteratorsWithChanges();
             UpdateMsg msg = iteratorSortedSet.first().getChange();
             result = msg.getChangeNumber();
           } catch (Exception e)
@@ -497,21 +459,58 @@
             result = null;
           } finally
           {
-            for (ReplicationIterator iterator : iteratorSortedSet)
-            {
-              iterator.releaseCursor();
-            }
+            releaseAllIterators(iteratorSortedSet);
           }
-        } else
-        {
-          UpdateMsg msg = lateQueue.first();
-          result = msg.getChangeNumber();
         }
       }
     }
     return result;
   }
 
+  private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
+  {
+    SortedSet<ReplicationIterator> results =
+        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
+
+    // Build a list of candidates iterator (i.e. db i.e. server)
+    for (int serverId : replicationServerDomain.getServers())
+    {
+      // get the last already sent CN from that server
+      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+      // get an iterator in this server db from that last change
+      ReplicationIterator iter =
+        replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+      /*
+      if that iterator has changes, then it is a candidate
+      it is added in the sorted list at a position given by its
+      current change (see ReplicationIteratorComparator).
+      */
+      if (iter != null)
+      {
+        if (iter.getChange() != null)
+        {
+          results.add(iter);
+        }
+        else
+        {
+          iter.releaseCursor();
+        }
+      }
+    }
+    return results;
+  }
+
+  private void releaseAllIterators(SortedSet<ReplicationIterator> iterators)
+  {
+    if (iterators != null)
+    {
+      for (ReplicationIterator iter : iterators)
+      {
+        iter.releaseCursor();
+      }
+    }
+  }
+
   /**
    * Get the count of updates sent to this server.
    * @return  The count of update sent to this server.

--
Gitblit v1.10.0