From 66f48672bc7953e77364a9a7ae41f1e70d83534f Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 18 Sep 2008 06:40:07 +0000
Subject: [PATCH] Fix for issue 3477 : OpenDS runs out of synchronization or crashes

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |   43 +++++++++++++++++++++++++++++--------------
 1 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a99ad1b..f85e133 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
   private int maxSendDelay = 0;
-  private int maxQueueSize = 10000;
+  private int maxQueueSize = 5000;
+  private int maxQueueBytesSize = maxQueueSize * 100;
   private int restartReceiveQueue;
   private int restartSendQueue;
   private int restartReceiveDelay;
@@ -115,8 +116,8 @@
   private Semaphore sendWindow;
   private int sendWindowSize;
   private boolean flowControl = false; // indicate that the server is
-                                       // flow controled and should
-                                       // be stopped from sending messsages.
+                                       // flow controlled and should
+                                       // be stopped from sending messages.
   private int saturationCount = 0;
   private short replicationServerId;
 
@@ -165,6 +166,7 @@
     super("Server Handler");
     this.session = session;
     this.maxQueueSize = queueSize;
+    this.maxQueueBytesSize = queueSize * 100;
     this.protocolVersion = ProtocolVersion.currentVersion();
   }
 
@@ -305,7 +307,7 @@
         session.publish(myStartMsg);
         sendWindowSize = receivedMsg.getWindowSize();
 
-        /* Until here session is encrypted then it depends on the negociation */
+        /* Until here session is encrypted then it depends on the negotiation */
         if (!sslEncryption)
         {
           session.stopEncryption();
@@ -352,7 +354,7 @@
         }
         else
         {
-          // We are an empty Replicationserver
+          // We are an empty Replication Server
           if ((generationId>0)&&(!serverState.isEmpty()))
           {
             // If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
   {
     synchronized (msgQueue)
     {
-      int size = msgQueue.size();
+      int size = msgQueue.count();
 
       if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
         return true;
@@ -764,7 +766,7 @@
   {
     synchronized (msgQueue)
     {
-      int queueSize = msgQueue.size();
+      int queueSize = msgQueue.count();
       if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
         return false;
       if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
      * the number of updates to be sent is the size of the receive queue.
      */
      if (isFollowing())
-       return msgQueue.size();
+       return msgQueue.count();
      else
      {
        /*
@@ -1034,7 +1036,8 @@
       /* TODO : size should be configurable
        * and larger than max-receive-queue-size
        */
-      while (msgQueue.size() > maxQueueSize)
+      while ((msgQueue.count() > maxQueueSize) ||
+          (msgQueue.bytesCount() > maxQueueBytesSize))
       {
         setFollowing(false);
         msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
 
           // The loop below relies on the fact that it is sorted based
           // on the currentChange of each iterator to consider the next
-          // change accross all servers.
+          // change across all servers.
           // Hence it is necessary to remove and eventual add again an iterator
           // when looping in order to keep consistent the order of the
           // iterators (see ReplicationIteratorComparator.
-          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
+          while (!iteratorSortedSet.isEmpty() &&
+                 (lateQueue.count()<100) &&
+                 (lateQueue.bytesCount()<50000) )
           {
             ReplicationIterator iterator = iteratorSortedSet.first();
             iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
           {
             synchronized (msgQueue)
             {
-              if (msgQueue.size() < maxQueueSize)
+              if ((msgQueue.count() < maxQueueSize) &&
+                  (msgQueue.bytesCount() < maxQueueBytesSize))
               {
                 setFollowing(true);
               }
@@ -1203,7 +1209,7 @@
             {
               if (msgQueue.contains(msg))
               {
-                /* we finally catched up with the regular queue */
+                /* we finally catch up with the regular queue */
                 setFollowing(true);
                 lateQueue.clear();
                 UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
    * Update the serverState with the last message sent.
    *
    * @param msg the last update sent.
-   * @return boolean indicating if the update was meaningfull.
+   * @return boolean indicating if the update was meaningful.
    */
   public boolean  updateServerState(UpdateMessage msg)
   {
@@ -1599,6 +1605,15 @@
       }
     }
 
+    attributes.add(
+        new Attribute("queue-size", String.valueOf(msgQueue.count())));
+    attributes.add(
+        new Attribute(
+            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+    attributes.add(
+        new Attribute(
+            "following", String.valueOf(following)));
+
     // Deprecated
     attributes.add(new Attribute("max-waiting-changes",
                                   String.valueOf(maxQueueSize)));

--
Gitblit v1.10.0