From bcf686add35bda4a6ac5c3d085abe151ea018e8e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 14 Jan 2009 08:29:50 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/replication/server/DbHandler.java |  128 ++++++++++++++++++++++++------------------
 1 files changed, 72 insertions(+), 56 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 097638d..1f21de3 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
@@ -86,13 +86,15 @@
   // the threads calling add() method will be blocked if the size of
   // msgQueue becomes larger than the  queueHimark and will resume
   // only when the size of the msgQueue goes below queueLowmark.
-  int queueHimark = 5000;
-  int queueLowmark = 4000;
+  int queueMaxSize = 5000;
+  int queueLowmark = 1000;
+  int queueHimark = 4000;
 
   // The queue himark and lowmark in bytes, this is set to 100 times the
   // himark and lowmark in number of updates.
-  int queueHimarkBytes = 100 * queueHimark;
+  int queueMaxBytes = 100 * queueMaxSize;
   int queueLowmarkBytes = 100 * queueLowmark;
+  int queueHimarkBytes = 100 * queueHimark;
 
   // The number of bytes currently in the queue
   int queueByteSize = 0;
@@ -140,10 +142,12 @@
     serverId = id;
     this.baseDn = baseDn;
     trimage = replicationServer.getTrimage();
-    queueHimark = queueSize;
-    queueLowmark = queueSize * 4 / 5;
-    queueHimarkBytes = 100 * queueHimark;
-    queueLowmarkBytes = 100 * queueLowmark;
+    queueMaxSize = queueSize;
+    queueLowmark = queueSize * 1 / 5;
+    queueHimark = queueSize * 4 / 5;
+    queueMaxBytes = 200 * queueMaxSize;
+    queueLowmarkBytes = 200 * queueLowmark;
+    queueHimarkBytes = 200 * queueLowmark;
     db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
     firstChange = db.readFirstChange();
     lastChange = db.readLastChange();
@@ -171,11 +175,14 @@
     synchronized (msgQueue)
     {
       int size = msgQueue.size();
-      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+      if ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+        msgQueue.notify();
+
+      while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes))
       {
         try
         {
-          msgQueue.wait(500);
+          msgQueue.wait(5000);
         } catch (InterruptedException e)
         {
           // simply loop to try again.
@@ -379,17 +386,22 @@
   {
     while (shutdown == false)
     {
-      try {
+      try
+      {
         flush();
         trim();
 
-        synchronized (this)
+        synchronized (msgQueue)
         {
-          try
+          if ((msgQueue.size() < queueLowmark) &&
+              (queueByteSize < queueLowmarkBytes))
           {
-            this.wait(1000);
-          } catch (InterruptedException e)
-          { }
+            try
+            {
+              msgQueue.wait(10000);
+            } catch (InterruptedException e)
+            { }
+          }
         }
       } catch (Exception end)
       {
@@ -434,56 +446,59 @@
     int tries = 0;
     while ((tries++ < DEADLOCK_RETRIES) && (!done))
     {
-      /* the trim is done by group in order to save some CPU and IO bandwidth
-       * start the transaction then do a bunch of remove then commit
-       */
-      ReplServerDBCursor cursor;
-      cursor = db.openDeleteCursor();
-
-      try
+      synchronized (flushLock)
       {
-        while ((size < 5000 ) &&  (!finished))
+        /* the trim is done by group in order to save some CPU and IO bandwidth
+         * start the transaction then do a bunch of remove then commit
+         */
+        ReplServerDBCursor cursor;
+        cursor = db.openDeleteCursor();
+
+        try
         {
-          ChangeNumber changeNumber = cursor.nextChangeNumber();
-          if (changeNumber != null)
+          while ((size < 5000 ) &&  (!finished))
           {
-            if ((!changeNumber.equals(lastChange))
-                && (changeNumber.older(trimDate)))
+            ChangeNumber changeNumber = cursor.nextChangeNumber();
+            if (changeNumber != null)
             {
-              size++;
-              cursor.delete();
+              if ((!changeNumber.equals(lastChange))
+                  && (changeNumber.older(trimDate)))
+              {
+                size++;
+                cursor.delete();
+              }
+              else
+              {
+                firstChange = changeNumber;
+                finished = true;
+              }
             }
             else
-            {
-              firstChange = changeNumber;
               finished = true;
-            }
           }
-          else
-            finished = true;
+          cursor.close();
+          done = true;
         }
-        cursor.close();
-        done = true;
-      }
-      catch (DeadlockException e)
-      {
-        cursor.abort();
-        if (tries == DEADLOCK_RETRIES)
+        catch (DeadlockException e)
         {
-          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
-          // shutdown the ReplicationServer.
+          cursor.abort();
+          if (tries == DEADLOCK_RETRIES)
+          {
+            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
+            // shutdown the ReplicationServer.
+            shutdown = true;
+            throw (e);
+          }
+        }
+        catch (DatabaseException e)
+        {
+          // mark shutdown for this db so that we don't try again to
+          // stop it from cursor.close() or methods called by cursor.close()
           shutdown = true;
+          cursor.abort();
           throw (e);
         }
       }
-      catch (DatabaseException e)
-      {
-        // mark shutdown for this db so that we don't try again to
-        // stop it from cursor.close() or methods called by cursor.close()
-        shutdown = true;
-        cursor.abort();
-        throw (e);
-      }
     }
   }
 
@@ -493,7 +508,7 @@
   private void flush()
   {
     int size;
-    int chunksize = (500 < queueHimark ? 500 : queueHimark);
+    int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
 
     do
     {
@@ -630,9 +645,10 @@
     {
       msgQueue.clear();
       queueByteSize = 0;
+
+      db.clear();
+      firstChange = db.readFirstChange();
+      lastChange = db.readLastChange();
     }
-    db.clear();
-    firstChange = db.readFirstChange();
-    lastChange = db.readLastChange();
   }
 }

--
Gitblit v1.10.0