From 89c803ce85e8a88d54544bfe35f24c9be34758a3 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 18 Sep 2008 06:40:07 +0000
Subject: [PATCH] Fix for issue 3477 : OpenDS runs out of synchronization or crashes

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java |   57 ++++++++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 40 insertions(+), 17 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 14ec358..d27daca 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@
   //
   private final LinkedList<UpdateMessage> msgQueue =
     new LinkedList<UpdateMessage>();
+
+  // The High and low water mark for the max size of the msgQueue.
+  // the threads calling add() method will be blocked if the size of
+  // msgQueue becomes larger than the  queueHimark and will resume
+  // only when the size of the msgQueue goes below queueLowmark.
+  int queueHimark = 5000;
+  int queueLowmark = 4000;
+
+  // The queue himark and lowmark in bytes, this is set to 100 times the
+  // himark and lowmark in number of updates.
+  int queueHimarkBytes = 100 * queueHimark;
+  int queueLowmarkBytes = 100 * queueLowmark;
+
+  // The number of bytes currently in the queue
+  int queueByteSize = 0;
+
   private ReplicationDB db;
   private ChangeNumber firstChange = null;
   private ChangeNumber lastChange = null;
@@ -93,14 +109,6 @@
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
-
-  // The High and low water mark for the max size of the msgQueue.
-  // the threads calling add() method will be blocked if the size of
-  // msgQueue becomes larger than the  MSG_QUEUE_HIMARK and will resume
-  // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
-  final static int MSG_QUEUE_HIMARK = 5000;
-  final static int MSG_QUEUE_LOWMARK = 4000;
-
   // The maximum number of retries in case of DatabaseDeadlock Exception.
   private static final int DEADLOCK_RETRIES = 10;
 
@@ -120,16 +128,22 @@
    * @param replicationServer The ReplicationServer that creates this dbHandler.
    * @param dbenv the Database Env to use to create the ReplicationServer DB.
    * server for this domain.
+   * @param queueSize The queueSize to use when creating the dbHandler.
    * @throws DatabaseException If a database problem happened
    */
-  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
-      ReplicationDbEnv dbenv)
+  public DbHandler(
+      short id, DN baseDn, ReplicationServer replicationServer,
+      ReplicationDbEnv dbenv, int queueSize)
          throws DatabaseException
   {
     this.replicationServer = replicationServer;
-    this.serverId = id;
+    serverId = id;
     this.baseDn = baseDn;
-    this.trimage = replicationServer.getTrimage();
+    trimage = replicationServer.getTrimage();
+    queueHimark = queueSize;
+    queueLowmark = queueSize * 4 / 5;
+    queueHimarkBytes = 100 * queueHimark;
+    queueLowmarkBytes = 100 * queueLowmark;
     db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
     firstChange = db.readFirstChange();
     lastChange = db.readLastChange();
@@ -156,7 +170,7 @@
     synchronized (msgQueue)
     {
       int size = msgQueue.size();
-      while (size > MSG_QUEUE_HIMARK)
+      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
       {
         try
         {
@@ -168,6 +182,7 @@
         size = msgQueue.size();
       }
 
+      queueByteSize += update.size();
       msgQueue.add(update);
       if (lastChange == null || lastChange.older(update.getChangeNumber()))
       {
@@ -308,10 +323,12 @@
       int current = 0;
       while ((current < number) && (!msgQueue.isEmpty()))
       {
-        msgQueue.remove();
+        UpdateMessage msg = msgQueue.remove();
+        queueByteSize -= msg.size();
         current++;
       }
-      if (msgQueue.size() < MSG_QUEUE_LOWMARK)
+      if ((msgQueue.size() < queueLowmark) &&
+          (queueByteSize < queueLowmarkBytes))
         msgQueue.notify();
     }
   }
@@ -475,13 +492,14 @@
   private void flush()
   {
     int size;
+    int chunksize = (500 < queueHimark ? 500 : queueHimark);
 
     do
     {
       synchronized(flushLock)
       {
         // get N messages to save in the DB
-        List<UpdateMessage> changes = getChanges(500);
+        List<UpdateMessage> changes = getChanges(chunksize);
 
         // if no more changes to save exit immediately.
         if ((changes == null) || ((size = changes.size()) == 0))
@@ -493,7 +511,7 @@
         // remove the changes from the list of changes to be saved.
         clearQueue(changes.size());
       }
-    } while (size >=500);
+    } while (size >= chunksize);
   }
 
   /**
@@ -529,6 +547,10 @@
         attributes.add(new Attribute("last-change",
             lastChange.toString() + " " + lastTime.toString()));
       }
+      attributes.add(
+          new Attribute("queue-size", String.valueOf(msgQueue.size())));
+      attributes.add(
+          new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
 
       return attributes;
     }
@@ -604,6 +626,7 @@
     synchronized(flushLock)
     {
       msgQueue.clear();
+      queueByteSize = 0;
     }
     db.clear();
     firstChange = db.readFirstChange();

--
Gitblit v1.10.0