From f312ec4a15ca08a406c045748e9d627fe1e31494 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 17 Nov 2006 13:46:39 +0000
Subject: [PATCH] The synchronization changelog monitoring information has a counter named waiting-changes that publish the number of updates known by the changelog server that have not yest been sent to each ldap server because they are too slow to replay them.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java |   65 +++++++++++++++++++++++++-------
 1 files changed, 51 insertions(+), 14 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 10663dc..a8785c3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -85,6 +85,7 @@
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
   private int maxSendDelay = 0;
+  private int maxQueueSize = 10000;
   private int restartReceiveQueue;
   private int restartSendQueue;
   private int restartReceiveDelay;
@@ -109,13 +110,16 @@
   /**
    * Creates a new server handler instance with the provided socket.
    *
-   * @param  session The ProtocolSession used by the ServerHandler to
+   * @param session The ProtocolSession used by the ServerHandler to
    *                 communicate with the remote entity.
+   * @param queueSize The maximum number of update that will be kept
+   *                  in memory by this ServerHandler.
    */
-  public ServerHandler(ProtocolSession session)
+  public ServerHandler(ProtocolSession session, int queueSize)
   {
     super("Server Handler");
     this.session = session;
+    this.maxQueueSize = queueSize;
   }
 
   /**
@@ -467,19 +471,50 @@
   {
    synchronized (msgQueue)
    {
-     /*
-      * TODO : When the server  is not able to follow, the msgQueue
-      * may become too large and therefore won't contain all the
-      * changes. Some changes may only be stored in the backing DB
-      * of the servers.
-      * The calculation should be done by asking to the each dbHandler
-      * how many changes need to be replicated and making the sum
-      * For now just return maxint in this case
-      */
+    /*
+     * When the server is up to date or close to be up to date,
+     * the number of updates to be sent is the size of the receive queue.
+     */
      if (isFollowing())
        return msgQueue.size();
      else
-       return Integer.MAX_VALUE;
+     {
+       /*
+        * When the server  is not able to follow, the msgQueue
+        * may become too large and therefore won't contain all the
+        * changes. Some changes may only be stored in the backing DB
+        * of the servers.
+        * The total size of teh receieve queue is calculated by doing
+        * the sum of the number of missing changes for every dbHandler.
+        */
+       int totalCount = 0;
+       ServerState dbState = changelogCache.getDbServerState();
+       for (short id : dbState)
+       {
+         int max = dbState.getMaxChangeNumber(id).getSeqnum();
+         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
+         if (currentChange != null)
+         {
+           int current = currentChange.getSeqnum();
+           if (current == max)
+           {
+           }
+           else if (current < max)
+           {
+             totalCount += max - current;
+           }
+           else
+           {
+             totalCount += Integer.MAX_VALUE - (current - max) + 1;
+           }
+         }
+         else
+         {
+           totalCount += max;
+         }
+       }
+       return totalCount;
+     }
    }
   }
 
@@ -576,7 +611,7 @@
       /* TODO : size should be configurable
        * and larger than max-receive-queue-size
        */
-      while (msgQueue.size() > 10000)
+      while (msgQueue.size() > maxQueueSize)
       {
         following = false;
         msgQueue.removeFirst();
@@ -687,7 +722,7 @@
           {
             synchronized (msgQueue)
             {
-              if (msgQueue.size() < 10000)
+              if (msgQueue.size() < maxQueueSize)
               {
                 following = true;
               }
@@ -1026,6 +1061,8 @@
                                  baseDn.toString()));
     attributes.add(new Attribute("waiting-changes",
                                  String.valueOf(getRcvMsgQueueSize())));
+    attributes.add(new Attribute("max-waiting-changes",
+                                 String.valueOf(maxQueueSize)));
     attributes.add(new Attribute("update-waiting-acks",
                                  String.valueOf(getWaitingAckSize())));
     attributes.add(new Attribute("update-sent",

--
Gitblit v1.10.0