From c210356ee7f6b0037e2d672a4a11e83bbd160b0e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 08:59:16 +0000
Subject: [PATCH] MessageHandler.java: Extracted methods isMsgQueueAboveThreshold() and isMsgQueueBelowThreshold(). Extracted MsgQueue.consumeUpTo(UpdateMsg). Updated javadocs.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java |   45 +++++++++++++---------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java       |   24 +++++++++++-
 2 files changed, 49 insertions(+), 20 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index af01fa2..ed243c4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -49,6 +49,9 @@
 /**
  * This class implements a buffering/producer/consumer mechanism of
  * replication changes (UpdateMsg) used inside the replication server.
+ * It logically represents another RS than the current one, and is used when the
+ * other RS is brought online and needs to catch up with the changes in the
+ * current RS.
  *
  * MessageHandlers are registered into Replication server domains.
  * When an update message is received by a domain, the domain forwards
@@ -156,21 +159,31 @@
       /* TODO : size should be configurable
        * and larger than max-receive-queue-size
        */
-      while ((msgQueue.count() > maxQueueSize) ||
-          (msgQueue.bytesCount() > maxQueueBytesSize))
+      while (isMsgQueueAboveThreshold())
       {
         following = false;
         msgQueue.removeFirst();
       }
     }
   }
+
+  private boolean isMsgQueueAboveThreshold()
+  {
+    return msgQueue.count() > maxQueueSize
+        || msgQueue.bytesCount() > maxQueueBytesSize;
+  }
+
+  private boolean isMsgQueueBelowThreshold()
+  {
+    return !isMsgQueueAboveThreshold();
+  }
+
   /**
    * Set the shut down flag to true and returns the previous value of the flag.
    * @return The previous value of the shut down flag
    */
   public boolean engageShutdown()
   {
-    // Use thread safe boolean
     return shuttingDown.getAndSet(true);
   }
 
@@ -300,16 +313,16 @@
           }
 
           /*
-           * If the late queue is empty then we could not find any
-           * messages in the replication log so the remote serevr is not
-           * late anymore.
+           * If the late queue is empty then we could not find any messages in
+           * the replication log so the remote server is not late anymore.
            */
           if (lateQueue.isEmpty())
           {
             synchronized (msgQueue)
             {
-              if ((msgQueue.count() < maxQueueSize) &&
-                  (msgQueue.bytesCount() < maxQueueBytesSize))
+              // Ensure we are below threshold so this server will follow the
+              // msgQueue without fearing the msgQueue gets trimmed
+              if (isMsgQueueBelowThreshold())
               {
                 following = true;
               }
@@ -330,13 +343,9 @@
                 /* we finally catch up with the regular queue */
                 following = true;
                 lateQueue.clear();
-                UpdateMsg msg1;
-                do
-                {
-                  msg1 = msgQueue.removeFirst();
-                } while (!msg.getCSN().equals(msg1.getCSN()));
+                msgQueue.consumeUpTo(msg);
                 updateServerState(msg);
-                return msg1;
+                return msg;
               }
             }
           }
@@ -386,9 +395,9 @@
         }
       }
       /*
-       * Need to loop because following flag may have gone to false between
-       * the first check at the beginning of this method
-       * and the second check just above.
+       * Need to loop because following flag may have gone to false between the
+       * first check at the beginning of this method and the second check just
+       * above.
        */
     }
     return null;
@@ -497,7 +506,7 @@
         new TreeSet<ReplicaDBCursor>();
     for (int serverId : replicationServerDomain.getServerIds())
     {
-      // get the last already sent CN from that server to get a cursor
+      // get the last already sent CSN from that server to get a cursor
       final CSN lastCsn = serverState.getCSN(serverId);
       addCursorIfNotEmpty(results,
           replicationServerDomain.getCursorFrom(serverId, lastCsn));
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
index 80b8255..97c391c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -27,7 +27,7 @@
  */
 package org.opends.server.replication.server;
 
-import java.util.SortedMap;
+import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import org.opends.messages.Message;
@@ -43,7 +43,7 @@
  */
 public class MsgQueue
 {
-  private SortedMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
+  private NavigableMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
   private final Object lock = new Object();
 
   /** The total number of bytes for all the message in the queue. */
@@ -192,4 +192,24 @@
       bytesCount = 0;
     }
   }
+
+  /**
+   * Consumes all the messages in this queue up to and including the passed in
+   * message. If the passed in message is not contained in the current queue,
+   * then all messages will be removed from it.
+   *
+   * @param msg
+   *          the final message to reach when consuming messages from this queue
+   */
+  public void consumeUpTo(UpdateMsg msg)
+  {
+    UpdateMsg msg1;
+    do
+    {
+      // FIXME this code could be more efficient if the msgQueue could call the
+      // following code (to be tested):
+      // map.headMap(msg.getCSN(), true).clear()
+      msg1 = removeFirst();
+    } while (!msg.getCSN().equals(msg1.getCSN()));
+  }
 }

--
Gitblit v1.10.0