From cd092a8cf0e5ff9b6b0df1c56e3ddfdd86a4aeaa Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 14:51:06 +0000
Subject: [PATCH] Fix failure of replication Dependency unit test by rethinking ReplicationIterator
---
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java | 76 +++++++++++++--
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java | 139 ++++++++++++++++++++++++++-
opends/src/server/org/opends/server/replication/server/DbHandler.java | 67 ++++---------
3 files changed, 222 insertions(+), 60 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 4ed5700..ca05642 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,7 +35,6 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
-import java.util.NoSuchElementException;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -47,7 +45,6 @@
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
@@ -123,7 +120,6 @@
*
*/
private long trimage;
- private static final DebugTracer TRACER = getTracer();
/**
* Creates a new dbHandler associated to a given LDAP server.
@@ -206,8 +202,8 @@
/**
* Get some changes out of the message queue of the LDAP server.
- *
- * @param number the number of messages to extract.
+ * (from the beginning of the queue)
+ * @param number the maximum number of messages to extract.
* @return a List containing number changes extracted from the queue.
*/
private List<UpdateMsg> getChanges(int number)
@@ -281,50 +277,17 @@
public ReplicationIterator generateIterator(ChangeNumber changeNumber)
throws DatabaseException, Exception
{
- /*
- * When we create an iterator we need to make sure that we
- * don't miss some changes because the iterator is created
- * close to the limit of the changed that have not yet been
- * flushed to the database.
- * We detect this by comparing the date of the changeNumber where
- * we want to start with the date of the first ChangeNumber
- * of the msgQueue.
- * If this is the case we flush the queue to the database.
- */
- ChangeNumber recentChangeNumber = null;
-
if (changeNumber == null)
{
flush();
}
- synchronized (msgQueue)
- {
- try
- {
- UpdateMsg msg = msgQueue.getLast();
- recentChangeNumber = msg.getChangeNumber();
- }
- catch (NoSuchElementException e)
- {}
- }
-
- if ( (recentChangeNumber != null) && (changeNumber != null))
- {
- if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
- ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
- {
- flush();
- }
- }
-
ReplicationIterator it =
- new ReplicationIterator(serverId, db, changeNumber);
-
+ new ReplicationIterator(serverId, db, changeNumber, this);
return it;
}
/**
- * Removes message in a subList of the msgQueue from the msgQueue.
+ * Removes the provided number of messages from the beginning of the msgQueue.
*
* @param number the number of changes to be removed.
*/
@@ -335,7 +298,7 @@
int current = 0;
while ((current < number) && (!msgQueue.isEmpty()))
{
- UpdateMsg msg = msgQueue.remove();
+ UpdateMsg msg = msgQueue.remove(); // remove first
queueByteSize -= msg.size();
current++;
}
@@ -508,8 +471,10 @@
/**
* Flush a number of updates from the memory list to the stable storage.
+ * Flush is done by chunk sized to 500 messages, starting from the
+ * beginning of the list.
*/
- private void flush()
+ public void flush()
{
int size;
int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
@@ -518,7 +483,8 @@
{
synchronized(flushLock)
{
- // get N messages to save in the DB
+ // get N (or less) messages from the queue to save to the DB
+ // (from the beginning of the queue)
List<UpdateMsg> changes = getChanges(chunksize);
// if no more changes to save exit immediately.
@@ -528,7 +494,8 @@
// save the change to the stable storage.
db.addEntries(changes);
- // remove the changes from the list of changes to be saved.
+ // remove the changes from the list of changes to be saved
+ // (remove from the beginning of the queue)
clearQueue(changes.size());
}
} while (size >= chunksize);
@@ -668,4 +635,14 @@
{
return this.serverId;
}
+
+ /**
+ * Return the size of the msgQueue (the memory cache of the DbHandler).
+ * For test purpose.
+ * @return The memory queue size.
+ */
+ public int getQueueSize()
+ {
+ return this.msgQueue.size();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index 6e0079b..795f7cf 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -40,6 +40,9 @@
{
private UpdateMsg currentChange = null;
private ReplServerDBCursor cursor = null;
+ private DbHandler dbh;
+ private ReplicationDB db;
+ ChangeNumber lastNonNullCurrentCN;
/**
* Creates a new ReplicationIterator.
@@ -49,18 +52,40 @@
* @param id the Identifier of the server on which the iterator applies.
* @param db The db where the iterator must be created.
* @param changeNumber The ChangeNumber after which the iterator must start.
+ * @param dbh The associated DbHandler.
* @throws Exception If there is no other change to push after change
* with changeNumber number.
* @throws DatabaseException if a database problem happened.
*/
public ReplicationIterator(
- short id, ReplicationDB db, ChangeNumber changeNumber)
+ short id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
throws Exception, DatabaseException
{
- cursor = db.openReadCursor(changeNumber);
+ this.db = db;
+ this.dbh = dbh;
+ this.lastNonNullCurrentCN = changeNumber;
+
+ try
+ {
+ cursor = db.openReadCursor(changeNumber);
+ }
+ catch(Exception e)
+ {
+ // we didn't find it in the db
+ cursor = null;
+ }
+
if (cursor == null)
{
- throw new Exception("no new change");
+ // flush the queue into the db
+ dbh.flush();
+
+ // look again in the db
+ cursor = db.openReadCursor(changeNumber);
+ if (cursor == null)
+ {
+ throw new Exception("no new change");
+ }
}
}
@@ -80,18 +105,47 @@
*/
public boolean next()
{
- currentChange = cursor.next();
+ boolean hasNext = false;
+
+ currentChange = cursor.next(); // can return null
if (currentChange != null)
- return true;
+ {
+ lastNonNullCurrentCN = currentChange.getChangeNumber();
+ hasNext = true;
+ }
else
{
- // TODO : should check here if some changes are still in the
- // dbHandler message queue and not yet saved to the backing database
- // if yes should get change from there from now on.
- return false;
+ synchronized (this)
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+ dbh.flush();
+ try
+ {
+ cursor = db.openReadCursor(lastNonNullCurrentCN);
+ currentChange = cursor.next(); // can return null
+ lastNonNullCurrentCN = currentChange.getChangeNumber();
+ if (currentChange != null)
+ {
+ hasNext = true;
+ }
+ else
+ {
+ hasNext = false;
+ }
+ }
+ catch(Exception e)
+ {
+ currentChange = null;
+ hasNext = false;
+ }
+ }
}
-
+ return hasNext;
}
/**
@@ -108,6 +162,8 @@
cursor.close();
cursor = null;
}
+ this.dbh = null;
+ this.db = null;
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index ff51863..b84564e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -50,6 +50,7 @@
ReplicationServer replicationServer = null;
ReplicationDbEnv dbEnv = null;
DbHandler handler = null;
+ ReplicationIterator it = null;
try
{
TestCaseUtils.startServer();
@@ -85,22 +86,145 @@
ChangeNumber changeNumber1 = gen.newChangeNumber();
ChangeNumber changeNumber2 = gen.newChangeNumber();
ChangeNumber changeNumber3 = gen.newChangeNumber();
+ ChangeNumber changeNumber4 = gen.newChangeNumber();
+ ChangeNumber changeNumber5 = gen.newChangeNumber();
DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
"uid");
DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
"uid");
DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
- "uid");
+ "uid");
+ DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4,
+ "uid");
handler.add(update1);
handler.add(update2);
handler.add(update3);
- // The ChangeNumber should not get purged
+ //--
+ // Iterator tests with memory queue only populated
+
+ // verify that memory queue is populated
+ assertEquals(handler.getQueueSize(),3);
+
+ // Iterator from existing CN
+ it = handler.generateIterator(changeNumber1);
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
+ " Actual change number=" + it.getChange().getChangeNumber() +
+ " Expect change number=" + changeNumber2);
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertFalse(it.next());
+ it.releaseCursor();
+ it=null;
+
+ // Iterator from NON existing CN
+ Exception ec = null;
+ try
+ {
+ it = handler.generateIterator(changeNumber5);
+ }
+ catch(Exception e)
+ {
+ ec = e;
+ }
+ assertNotNull(ec);
+ assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+
+ //--
+ // Iterator tests with db only populated
+ Thread.sleep(1000); // let the time for flush to happen
+
+ // verify that memory queue is empty (all changes flushed in the db)
+ assertEquals(handler.getQueueSize(),0);
+
+ // Test iterator from existing CN
+ it = handler.generateIterator(changeNumber1);
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertFalse(it.next());
+ it.releaseCursor();
+ it=null;
+
+ // Iterator from NON existing CN
+ ec = null;
+ try
+ {
+ it = handler.generateIterator(changeNumber5);
+ }
+ catch(Exception e)
+ {
+ ec = e;
+ }
+ assertNotNull(ec);
+ assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+
+ // Test first and last
assertEquals(changeNumber1, handler.getFirstChange());
assertEquals(changeNumber3, handler.getLastChange());
+ //--
+ // Iterator tests with db and memory queue populated
+ // all changes in the db - add one in the memory queue
+ handler.add(update4);
+
+ // verify memory queue contains this one
+ assertEquals(handler.getQueueSize(),1);
+
+ // Test iterator from existing CN
+ it = handler.generateIterator(changeNumber1);
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertFalse(it.next());
+ assertTrue(it.getChange()==null);
+ it.releaseCursor();
+ it=null;
+
+ // Test iterator from existing CN at the limit between queue and db
+ it = handler.generateIterator(changeNumber3);
+ assertTrue(it.next());
+ assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
+ " Actual change number=" + it.getChange().getChangeNumber());
+ assertFalse(it.next());
+ assertTrue(it.getChange()==null);
+ it.releaseCursor();
+ it=null;
+
+ // Test iterator from existing CN at the limit between queue and db
+ it = handler.generateIterator(changeNumber4);
+ assertFalse(it.next());
+ assertTrue(it.getChange()==null,
+ " Actual change number=" + it.getChange());
+ it.releaseCursor();
+ it=null;
+
+ // Test iterator from NON existing CN
+ ec = null;
+ try
+ {
+ it = handler.generateIterator(changeNumber5);
+ }
+ catch(Exception e)
+ {
+ ec = e;
+ }
+ assertNotNull(ec);
+ assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+
handler.setPurgeDelay(1);
boolean purged = false;
@@ -109,8 +233,8 @@
{
ChangeNumber firstChange = handler.getFirstChange();
ChangeNumber lastChange = handler.getLastChange();
- if ((!firstChange.equals(changeNumber3) ||
- (!lastChange.equals(changeNumber3))))
+ if ((!firstChange.equals(changeNumber4) ||
+ (!lastChange.equals(changeNumber4))))
{
TestCaseUtils.sleep(100);
} else
@@ -120,6 +244,11 @@
}
} finally
{
+ if (it != null)
+ {
+ it.releaseCursor();
+ it=null;
+ }
if (handler != null)
handler.shutdown();
if (dbEnv != null)
--
Gitblit v1.10.0