From bcf686add35bda4a6ac5c3d085abe151ea018e8e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 14 Jan 2009 08:29:50 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/replication/server/DbHandler.java | 128 ++++++++++++++++++++++++------------------
1 files changed, 72 insertions(+), 56 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 097638d..1f21de3 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -86,13 +86,15 @@
// the threads calling add() method will be blocked if the size of
// msgQueue becomes larger than the queueHimark and will resume
// only when the size of the msgQueue goes below queueLowmark.
- int queueHimark = 5000;
- int queueLowmark = 4000;
+ int queueMaxSize = 5000;
+ int queueLowmark = 1000;
+ int queueHimark = 4000;
// The queue himark and lowmark in bytes, this is set to 100 times the
// himark and lowmark in number of updates.
- int queueHimarkBytes = 100 * queueHimark;
+ int queueMaxBytes = 100 * queueMaxSize;
int queueLowmarkBytes = 100 * queueLowmark;
+ int queueHimarkBytes = 100 * queueHimark;
// The number of bytes currently in the queue
int queueByteSize = 0;
@@ -140,10 +142,12 @@
serverId = id;
this.baseDn = baseDn;
trimage = replicationServer.getTrimage();
- queueHimark = queueSize;
- queueLowmark = queueSize * 4 / 5;
- queueHimarkBytes = 100 * queueHimark;
- queueLowmarkBytes = 100 * queueLowmark;
+ queueMaxSize = queueSize;
+ queueLowmark = queueSize * 1 / 5;
+ queueHimark = queueSize * 4 / 5;
+ queueMaxBytes = 200 * queueMaxSize;
+ queueLowmarkBytes = 200 * queueLowmark;
+ queueHimarkBytes = 200 * queueLowmark;
db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
firstChange = db.readFirstChange();
lastChange = db.readLastChange();
@@ -171,11 +175,14 @@
synchronized (msgQueue)
{
int size = msgQueue.size();
- while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+ if ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+ msgQueue.notify();
+
+ while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes))
{
try
{
- msgQueue.wait(500);
+ msgQueue.wait(5000);
} catch (InterruptedException e)
{
// simply loop to try again.
@@ -379,17 +386,22 @@
{
while (shutdown == false)
{
- try {
+ try
+ {
flush();
trim();
- synchronized (this)
+ synchronized (msgQueue)
{
- try
+ if ((msgQueue.size() < queueLowmark) &&
+ (queueByteSize < queueLowmarkBytes))
{
- this.wait(1000);
- } catch (InterruptedException e)
- { }
+ try
+ {
+ msgQueue.wait(10000);
+ } catch (InterruptedException e)
+ { }
+ }
}
} catch (Exception end)
{
@@ -434,56 +446,59 @@
int tries = 0;
while ((tries++ < DEADLOCK_RETRIES) && (!done))
{
- /* the trim is done by group in order to save some CPU and IO bandwidth
- * start the transaction then do a bunch of remove then commit
- */
- ReplServerDBCursor cursor;
- cursor = db.openDeleteCursor();
-
- try
+ synchronized (flushLock)
{
- while ((size < 5000 ) && (!finished))
+ /* the trim is done by group in order to save some CPU and IO bandwidth
+ * start the transaction then do a bunch of remove then commit
+ */
+ ReplServerDBCursor cursor;
+ cursor = db.openDeleteCursor();
+
+ try
{
- ChangeNumber changeNumber = cursor.nextChangeNumber();
- if (changeNumber != null)
+ while ((size < 5000 ) && (!finished))
{
- if ((!changeNumber.equals(lastChange))
- && (changeNumber.older(trimDate)))
+ ChangeNumber changeNumber = cursor.nextChangeNumber();
+ if (changeNumber != null)
{
- size++;
- cursor.delete();
+ if ((!changeNumber.equals(lastChange))
+ && (changeNumber.older(trimDate)))
+ {
+ size++;
+ cursor.delete();
+ }
+ else
+ {
+ firstChange = changeNumber;
+ finished = true;
+ }
}
else
- {
- firstChange = changeNumber;
finished = true;
- }
}
- else
- finished = true;
+ cursor.close();
+ done = true;
}
- cursor.close();
- done = true;
- }
- catch (DeadlockException e)
- {
- cursor.abort();
- if (tries == DEADLOCK_RETRIES)
+ catch (DeadlockException e)
{
- // could not handle the Deadlock after DEADLOCK_RETRIES tries.
- // shutdown the ReplicationServer.
+ cursor.abort();
+ if (tries == DEADLOCK_RETRIES)
+ {
+ // could not handle the Deadlock after DEADLOCK_RETRIES tries.
+ // shutdown the ReplicationServer.
+ shutdown = true;
+ throw (e);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ // mark shutdown for this db so that we don't try again to
+ // stop it from cursor.close() or methods called by cursor.close()
shutdown = true;
+ cursor.abort();
throw (e);
}
}
- catch (DatabaseException e)
- {
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
- shutdown = true;
- cursor.abort();
- throw (e);
- }
}
}
@@ -493,7 +508,7 @@
private void flush()
{
int size;
- int chunksize = (500 < queueHimark ? 500 : queueHimark);
+ int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
do
{
@@ -630,9 +645,10 @@
{
msgQueue.clear();
queueByteSize = 0;
+
+ db.clear();
+ firstChange = db.readFirstChange();
+ lastChange = db.readLastChange();
}
- db.clear();
- firstChange = db.readFirstChange();
- lastChange = db.readLastChange();
}
}
--
Gitblit v1.10.0