From 66f48672bc7953e77364a9a7ae41f1e70d83534f Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 18 Sep 2008 06:40:07 +0000
Subject: [PATCH] Fix for issue 3477 : OpenDS runs out of synchronization or crashes

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                     |   15 +++
 opends/src/server/org/opends/server/replication/server/MsgQueue.java                              |   18 ++++
 opends/src/server/org/opends/server/replication/protocol/AddMsg.java                              |    9 ++
 opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java                       |    7 +
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                         |   43 +++++++---
 opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java                           |   11 ++
 opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java                           |   12 +++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java |    6 +
 opends/src/server/org/opends/server/replication/server/DbHandler.java                             |   57 ++++++++++----
 opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java                      |    4 
 opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java                         |   11 ++
 11 files changed, 156 insertions(+), 37 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index c6044c9..db5dab6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -297,4 +297,13 @@
   {
     return parentUniqueId;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int size()
+  {
+    return encodedAttributes.length + 40;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
index 28deccd..ef24139 100644
--- a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -122,4 +122,15 @@
   {
     return ("DEL " + getDn() + " " + getChangeNumber());
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int size()
+  {
+    // The DeleteMsg size is mostly dependent on the DN and should never
+    // grow very large. It is therefore safe to assume an average of 40 bytes.
+    return 40;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index aede5ee..9dac7f6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -360,4 +360,15 @@
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int size()
+  {
+    // The MODDN message size are mainly dependent on the
+    // size of the DN. let's assume that they average on 100 bytes
+    return 100;
+  }
+
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 0b0abda..d5397b8 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -235,4 +235,16 @@
   {
     return("MOD " + getDn() + " " + getChangeNumber());
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int size()
+  {
+    // The ModifyMsh can be very large when added or deleted attribute
+    // values are very large. We therefore need to count the
+    // whole encoded msg.
+    return encodedMsg.length;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
index cdc2a50..224190e 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -362,4 +362,11 @@
     }
 
   }
+
+  /**
+   * Return the number of bytes used by this message.
+   *
+   * @return The number of bytes used by this message.
+   */
+  public abstract int size();
 }
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 14ec358..d27daca 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@
   //
   private final LinkedList<UpdateMessage> msgQueue =
     new LinkedList<UpdateMessage>();
+
+  // The High and low water mark for the max size of the msgQueue.
+  // the threads calling add() method will be blocked if the size of
+  // msgQueue becomes larger than the  queueHimark and will resume
+  // only when the size of the msgQueue goes below queueLowmark.
+  int queueHimark = 5000;
+  int queueLowmark = 4000;
+
+  // The queue himark and lowmark in bytes, this is set to 100 times the
+  // himark and lowmark in number of updates.
+  int queueHimarkBytes = 100 * queueHimark;
+  int queueLowmarkBytes = 100 * queueLowmark;
+
+  // The number of bytes currently in the queue
+  int queueByteSize = 0;
+
   private ReplicationDB db;
   private ChangeNumber firstChange = null;
   private ChangeNumber lastChange = null;
@@ -93,14 +109,6 @@
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
-
-  // The High and low water mark for the max size of the msgQueue.
-  // the threads calling add() method will be blocked if the size of
-  // msgQueue becomes larger than the  MSG_QUEUE_HIMARK and will resume
-  // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
-  final static int MSG_QUEUE_HIMARK = 5000;
-  final static int MSG_QUEUE_LOWMARK = 4000;
-
   // The maximum number of retries in case of DatabaseDeadlock Exception.
   private static final int DEADLOCK_RETRIES = 10;
 
@@ -120,16 +128,22 @@
    * @param replicationServer The ReplicationServer that creates this dbHandler.
    * @param dbenv the Database Env to use to create the ReplicationServer DB.
    * server for this domain.
+   * @param queueSize The queueSize to use when creating the dbHandler.
    * @throws DatabaseException If a database problem happened
    */
-  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
-      ReplicationDbEnv dbenv)
+  public DbHandler(
+      short id, DN baseDn, ReplicationServer replicationServer,
+      ReplicationDbEnv dbenv, int queueSize)
          throws DatabaseException
   {
     this.replicationServer = replicationServer;
-    this.serverId = id;
+    serverId = id;
     this.baseDn = baseDn;
-    this.trimage = replicationServer.getTrimage();
+    trimage = replicationServer.getTrimage();
+    queueHimark = queueSize;
+    queueLowmark = queueSize * 4 / 5;
+    queueHimarkBytes = 100 * queueHimark;
+    queueLowmarkBytes = 100 * queueLowmark;
     db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
     firstChange = db.readFirstChange();
     lastChange = db.readLastChange();
@@ -156,7 +170,7 @@
     synchronized (msgQueue)
     {
       int size = msgQueue.size();
-      while (size > MSG_QUEUE_HIMARK)
+      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
       {
         try
         {
@@ -168,6 +182,7 @@
         size = msgQueue.size();
       }
 
+      queueByteSize += update.size();
       msgQueue.add(update);
       if (lastChange == null || lastChange.older(update.getChangeNumber()))
       {
@@ -308,10 +323,12 @@
       int current = 0;
       while ((current < number) && (!msgQueue.isEmpty()))
       {
-        msgQueue.remove();
+        UpdateMessage msg = msgQueue.remove();
+        queueByteSize -= msg.size();
         current++;
       }
-      if (msgQueue.size() < MSG_QUEUE_LOWMARK)
+      if ((msgQueue.size() < queueLowmark) &&
+          (queueByteSize < queueLowmarkBytes))
         msgQueue.notify();
     }
   }
@@ -475,13 +492,14 @@
   private void flush()
   {
     int size;
+    int chunksize = (500 < queueHimark ? 500 : queueHimark);
 
     do
     {
       synchronized(flushLock)
       {
         // get N messages to save in the DB
-        List<UpdateMessage> changes = getChanges(500);
+        List<UpdateMessage> changes = getChanges(chunksize);
 
         // if no more changes to save exit immediately.
         if ((changes == null) || ((size = changes.size()) == 0))
@@ -493,7 +511,7 @@
         // remove the changes from the list of changes to be saved.
         clearQueue(changes.size());
       }
-    } while (size >=500);
+    } while (size >= chunksize);
   }
 
   /**
@@ -529,6 +547,10 @@
         attributes.add(new Attribute("last-change",
             lastChange.toString() + " " + lastTime.toString()));
       }
+      attributes.add(
+          new Attribute("queue-size", String.valueOf(msgQueue.size())));
+      attributes.add(
+          new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
 
       return attributes;
     }
@@ -604,6 +626,7 @@
     synchronized(flushLock)
     {
       msgQueue.clear();
+      queueByteSize = 0;
     }
     db.clear();
     firstChange = db.readFirstChange();
diff --git a/opends/src/server/org/opends/server/replication/server/MsgQueue.java b/opends/src/server/org/opends/server/replication/server/MsgQueue.java
index d8b7295..3484633 100644
--- a/opends/src/server/org/opends/server/replication/server/MsgQueue.java
+++ b/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -42,6 +42,9 @@
   private SortedMap<ChangeNumber, UpdateMessage>  map =
     new TreeMap<ChangeNumber, UpdateMessage>();
 
+  // The total number of bytes for all the message in the queue.
+  private int bytesCount = 0;
+
   /**
    * Return the first UpdateMessage in the MsgQueue.
    *
@@ -67,12 +70,22 @@
    *
    * @return The number of elements in this MsgQueue.
    */
-  public int size()
+  public int count()
   {
     return map.size();
   }
 
   /**
+   * Returns the number of bytes in this MsgQueue.
+   *
+   * @return The number of bytes in this MsgQueue.
+   */
+  public int bytesCount()
+  {
+    return bytesCount;
+  }
+
+  /**
    * Returns <tt>true</tt> if this MsgQueue contains no UpdateMessage.
    *
    * @return <tt>true</tt> if this MsgQueue contains no UpdateMessage.
@@ -91,6 +104,7 @@
   public void add(UpdateMessage update)
   {
     map.put(update.getChangeNumber(), update);
+    bytesCount += update.size();
   }
 
   /**
@@ -102,6 +116,7 @@
   {
     UpdateMessage msg = map.get(map.firstKey());
     map.remove(msg.getChangeNumber());
+    bytesCount -= msg.size();
     return msg;
   }
 
@@ -126,5 +141,6 @@
   public void clear()
   {
     map.clear();
+    bytesCount = 0;
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index 48603d5..cca4d50 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -257,7 +257,9 @@
               + " serverId=" + serverId);
 
           DbHandler dbHandler =
-            new DbHandler(serverId, baseDn, replicationServer, this);
+            new DbHandler(
+                serverId, baseDn, replicationServer, this,
+                replicationServer.getQueueSize());
 
           replicationServer.getReplicationServerDomain(baseDn, true).
           setDbHandler(serverId, dbHandler);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 393a531..5fe6a1e 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -134,7 +134,7 @@
   // ID of the backend
   private static final String backendId = "replicationChanges";
 
-  // At startup, the listen thread wait on this flag for the connet
+  // At startup, the listen thread wait on this flag for the connect
   // thread to look for other servers in the topology.
   private boolean connectedInTopology = false;
   private final Object connectedInTopologyLock = new Object();
@@ -554,7 +554,7 @@
   public DbHandler newDbHandler(short id, DN baseDn)
   throws DatabaseException
   {
-    return new DbHandler(id, baseDn, this, dbEnv);
+    return new DbHandler(id, baseDn, this, dbEnv, queueSize);
   }
 
   /**
@@ -827,6 +827,17 @@
   }
 
   /**
+   * Get the queueSize for this replication server.
+   *
+   * @return The maximum size of the queues for this Replication Server
+   *
+   */
+  public int getQueueSize()
+  {
+    return queueSize;
+  }
+
+  /**
    * Creates the backend associated to this replication server.
    * @throws ConfigException
    */
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a99ad1b..f85e133 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
   private int maxSendDelay = 0;
-  private int maxQueueSize = 10000;
+  private int maxQueueSize = 5000;
+  private int maxQueueBytesSize = maxQueueSize * 100;
   private int restartReceiveQueue;
   private int restartSendQueue;
   private int restartReceiveDelay;
@@ -115,8 +116,8 @@
   private Semaphore sendWindow;
   private int sendWindowSize;
   private boolean flowControl = false; // indicate that the server is
-                                       // flow controled and should
-                                       // be stopped from sending messsages.
+                                       // flow controlled and should
+                                       // be stopped from sending messages.
   private int saturationCount = 0;
   private short replicationServerId;
 
@@ -165,6 +166,7 @@
     super("Server Handler");
     this.session = session;
     this.maxQueueSize = queueSize;
+    this.maxQueueBytesSize = queueSize * 100;
     this.protocolVersion = ProtocolVersion.currentVersion();
   }
 
@@ -305,7 +307,7 @@
         session.publish(myStartMsg);
         sendWindowSize = receivedMsg.getWindowSize();
 
-        /* Until here session is encrypted then it depends on the negociation */
+        /* Until here session is encrypted then it depends on the negotiation */
         if (!sslEncryption)
         {
           session.stopEncryption();
@@ -352,7 +354,7 @@
         }
         else
         {
-          // We are an empty Replicationserver
+          // We are an empty Replication Server
           if ((generationId>0)&&(!serverState.isEmpty()))
           {
             // If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
   {
     synchronized (msgQueue)
     {
-      int size = msgQueue.size();
+      int size = msgQueue.count();
 
       if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
         return true;
@@ -764,7 +766,7 @@
   {
     synchronized (msgQueue)
     {
-      int queueSize = msgQueue.size();
+      int queueSize = msgQueue.count();
       if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
         return false;
       if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
      * the number of updates to be sent is the size of the receive queue.
      */
      if (isFollowing())
-       return msgQueue.size();
+       return msgQueue.count();
      else
      {
        /*
@@ -1034,7 +1036,8 @@
       /* TODO : size should be configurable
        * and larger than max-receive-queue-size
        */
-      while (msgQueue.size() > maxQueueSize)
+      while ((msgQueue.count() > maxQueueSize) ||
+          (msgQueue.bytesCount() > maxQueueBytesSize))
       {
         setFollowing(false);
         msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
 
           // The loop below relies on the fact that it is sorted based
           // on the currentChange of each iterator to consider the next
-          // change accross all servers.
+          // change across all servers.
           // Hence it is necessary to remove and eventual add again an iterator
           // when looping in order to keep consistent the order of the
           // iterators (see ReplicationIteratorComparator.
-          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
+          while (!iteratorSortedSet.isEmpty() &&
+                 (lateQueue.count()<100) &&
+                 (lateQueue.bytesCount()<50000) )
           {
             ReplicationIterator iterator = iteratorSortedSet.first();
             iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
           {
             synchronized (msgQueue)
             {
-              if (msgQueue.size() < maxQueueSize)
+              if ((msgQueue.count() < maxQueueSize) &&
+                  (msgQueue.bytesCount() < maxQueueBytesSize))
               {
                 setFollowing(true);
               }
@@ -1203,7 +1209,7 @@
             {
               if (msgQueue.contains(msg))
               {
-                /* we finally catched up with the regular queue */
+                /* we finally catch up with the regular queue */
                 setFollowing(true);
                 lateQueue.clear();
                 UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
    * Update the serverState with the last message sent.
    *
    * @param msg the last update sent.
-   * @return boolean indicating if the update was meaningfull.
+   * @return boolean indicating if the update was meaningful.
    */
   public boolean  updateServerState(UpdateMessage msg)
   {
@@ -1599,6 +1605,15 @@
       }
     }
 
+    attributes.add(
+        new Attribute("queue-size", String.valueOf(msgQueue.count())));
+    attributes.add(
+        new Attribute(
+            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+    attributes.add(
+        new Attribute(
+            "following", String.valueOf(following)));
+
     // Deprecated
     attributes.add(new Attribute("max-waiting-changes",
                                   String.valueOf(maxQueueSize)));
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index f2a1021..05ceca0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,8 @@
     ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
 
     DbHandler handler =
-      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
+      new DbHandler(
+          (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
 
     ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
     ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +154,8 @@
     ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
 
     DbHandler handler =
-      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
+      new DbHandler(
+          (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
 
     // Creates changes added to the dbHandler
     ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);

--
Gitblit v1.10.0