From 19298042f99ef0a659f9271d022f58884a18704b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 08:59:16 +0000
Subject: [PATCH] MessageHandler.java: Extracted methods isMsgQueueAboveThreshold() and isMsgQueueBelowThreshold(). Extracted MsgQueue.consumeUpTo(UpdateMsg). Updated javadocs.

---
 opends/src/server/org/opends/server/replication/server/MessageHandler.java |   45 +++++++++++++++++++++++++++------------------
 1 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index af01fa2..ed243c4 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -49,6 +49,9 @@
 /**
  * This class implements a buffering/producer/consumer mechanism of
  * replication changes (UpdateMsg) used inside the replication server.
+ * It logically represents another RS than the current one, and is used when the
+ * other RS is brought online and needs to catch up with the changes in the
+ * current RS.
  *
  * MessageHandlers are registered into Replication server domains.
  * When an update message is received by a domain, the domain forwards
@@ -156,21 +159,31 @@
       /* TODO : size should be configurable
        * and larger than max-receive-queue-size
        */
-      while ((msgQueue.count() > maxQueueSize) ||
-          (msgQueue.bytesCount() > maxQueueBytesSize))
+      while (isMsgQueueAboveThreshold())
       {
         following = false;
         msgQueue.removeFirst();
       }
     }
   }
+
+  private boolean isMsgQueueAboveThreshold()
+  {
+    return msgQueue.count() > maxQueueSize
+        || msgQueue.bytesCount() > maxQueueBytesSize;
+  }
+
+  private boolean isMsgQueueBelowThreshold()
+  {
+    return !isMsgQueueAboveThreshold();
+  }
+
   /**
    * Set the shut down flag to true and returns the previous value of the flag.
    * @return The previous value of the shut down flag
    */
   public boolean engageShutdown()
   {
-    // Use thread safe boolean
     return shuttingDown.getAndSet(true);
   }
 
@@ -300,16 +313,16 @@
           }
 
           /*
-           * If the late queue is empty then we could not find any
-           * messages in the replication log so the remote serevr is not
-           * late anymore.
+           * If the late queue is empty then we could not find any messages in
+           * the replication log so the remote server is not late anymore.
            */
           if (lateQueue.isEmpty())
           {
             synchronized (msgQueue)
             {
-              if ((msgQueue.count() < maxQueueSize) &&
-                  (msgQueue.bytesCount() < maxQueueBytesSize))
+              // Ensure we are below threshold so this server will follow the
+              // msgQueue without fearing the msgQueue gets trimmed
+              if (isMsgQueueBelowThreshold())
               {
                 following = true;
               }
@@ -330,13 +343,9 @@
                 /* we finally catch up with the regular queue */
                 following = true;
                 lateQueue.clear();
-                UpdateMsg msg1;
-                do
-                {
-                  msg1 = msgQueue.removeFirst();
-                } while (!msg.getCSN().equals(msg1.getCSN()));
+                msgQueue.consumeUpTo(msg);
                 updateServerState(msg);
-                return msg1;
+                return msg;
               }
             }
           }
@@ -386,9 +395,9 @@
         }
       }
       /*
-       * Need to loop because following flag may have gone to false between
-       * the first check at the beginning of this method
-       * and the second check just above.
+       * Need to loop because following flag may have gone to false between the
+       * first check at the beginning of this method and the second check just
+       * above.
        */
     }
     return null;
@@ -497,7 +506,7 @@
         new TreeSet<ReplicaDBCursor>();
     for (int serverId : replicationServerDomain.getServerIds())
     {
-      // get the last already sent CN from that server to get a cursor
+      // get the last already sent CSN from that server to get a cursor
       final CSN lastCsn = serverState.getCSN(serverId);
       addCursorIfNotEmpty(results,
           replicationServerDomain.getCursorFrom(serverId, lastCsn));

--
Gitblit v1.10.0