From cd092a8cf0e5ff9b6b0df1c56e3ddfdd86a4aeaa Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 14:51:06 +0000
Subject: [PATCH] Fix failure of replication Dependency unit test by rethinking ReplicationIterator
---
opends/src/server/org/opends/server/replication/server/DbHandler.java | 67 +++++++++++----------------------
1 files changed, 22 insertions(+), 45 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 4ed5700..ca05642 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,7 +35,6 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
-import java.util.NoSuchElementException;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -47,7 +45,6 @@
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
@@ -123,7 +120,6 @@
*
*/
private long trimage;
- private static final DebugTracer TRACER = getTracer();
/**
* Creates a new dbHandler associated to a given LDAP server.
@@ -206,8 +202,8 @@
/**
* Get some changes out of the message queue of the LDAP server.
- *
- * @param number the number of messages to extract.
+ * (from the beginning of the queue)
+ * @param number the maximum number of messages to extract.
* @return a List containing number changes extracted from the queue.
*/
private List<UpdateMsg> getChanges(int number)
@@ -281,50 +277,17 @@
public ReplicationIterator generateIterator(ChangeNumber changeNumber)
throws DatabaseException, Exception
{
- /*
- * When we create an iterator we need to make sure that we
- * don't miss some changes because the iterator is created
- * close to the limit of the changed that have not yet been
- * flushed to the database.
- * We detect this by comparing the date of the changeNumber where
- * we want to start with the date of the first ChangeNumber
- * of the msgQueue.
- * If this is the case we flush the queue to the database.
- */
- ChangeNumber recentChangeNumber = null;
-
if (changeNumber == null)
{
flush();
}
- synchronized (msgQueue)
- {
- try
- {
- UpdateMsg msg = msgQueue.getLast();
- recentChangeNumber = msg.getChangeNumber();
- }
- catch (NoSuchElementException e)
- {}
- }
-
- if ( (recentChangeNumber != null) && (changeNumber != null))
- {
- if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
- ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
- {
- flush();
- }
- }
-
ReplicationIterator it =
- new ReplicationIterator(serverId, db, changeNumber);
-
+ new ReplicationIterator(serverId, db, changeNumber, this);
return it;
}
/**
- * Removes message in a subList of the msgQueue from the msgQueue.
+ * Removes the provided number of messages from the beginning of the msgQueue.
*
* @param number the number of changes to be removed.
*/
@@ -335,7 +298,7 @@
int current = 0;
while ((current < number) && (!msgQueue.isEmpty()))
{
- UpdateMsg msg = msgQueue.remove();
+ UpdateMsg msg = msgQueue.remove(); // remove first
queueByteSize -= msg.size();
current++;
}
@@ -508,8 +471,10 @@
/**
* Flush a number of updates from the memory list to the stable storage.
+ * Flush is done by chunk sized to 500 messages, starting from the
+ * beginning of the list.
*/
- private void flush()
+ public void flush()
{
int size;
int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
@@ -518,7 +483,8 @@
{
synchronized(flushLock)
{
- // get N messages to save in the DB
+ // get N (or less) messages from the queue to save to the DB
+ // (from the beginning of the queue)
List<UpdateMsg> changes = getChanges(chunksize);
// if no more changes to save exit immediately.
@@ -528,7 +494,8 @@
// save the change to the stable storage.
db.addEntries(changes);
- // remove the changes from the list of changes to be saved.
+ // remove the changes from the list of changes to be saved
+ // (remove from the beginning of the queue)
clearQueue(changes.size());
}
} while (size >= chunksize);
@@ -668,4 +635,14 @@
{
return this.serverId;
}
+
+ /**
+ * Return the size of the msgQueue (the memory cache of the DbHandler).
+ * For test purpose.
+ * @return The memory queue size.
+ */
+ public int getQueueSize()
+ {
+ return this.msgQueue.size();
+ }
}
--
Gitblit v1.10.0