From 5cbf2ca5558c4e978745f7d8b9197ba978faf1cb Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 14:51:06 +0000
Subject: [PATCH] Fix failure of replication Dependency unit test by rethinking ReplicationIterator
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java | 76 +++++++++++++++++++++++++++++++++-----
1 files changed, 66 insertions(+), 10 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index 6e0079b..795f7cf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -40,6 +40,9 @@
{
private UpdateMsg currentChange = null;
private ReplServerDBCursor cursor = null;
+ private DbHandler dbh;
+ private ReplicationDB db;
+ ChangeNumber lastNonNullCurrentCN;
/**
* Creates a new ReplicationIterator.
@@ -49,18 +52,40 @@
* @param id the Identifier of the server on which the iterator applies.
* @param db The db where the iterator must be created.
* @param changeNumber The ChangeNumber after which the iterator must start.
+ * @param dbh The associated DbHandler.
* @throws Exception If there is no other change to push after change
* with changeNumber number.
* @throws DatabaseException if a database problem happened.
*/
public ReplicationIterator(
- short id, ReplicationDB db, ChangeNumber changeNumber)
+ short id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
throws Exception, DatabaseException
{
- cursor = db.openReadCursor(changeNumber);
+ this.db = db;
+ this.dbh = dbh;
+ this.lastNonNullCurrentCN = changeNumber;
+
+ try
+ {
+ cursor = db.openReadCursor(changeNumber);
+ }
+ catch(Exception e)
+ {
+ // we didn't find it in the db
+ cursor = null;
+ }
+
if (cursor == null)
{
- throw new Exception("no new change");
+ // flush the queue into the db
+ dbh.flush();
+
+ // look again in the db
+ cursor = db.openReadCursor(changeNumber);
+ if (cursor == null)
+ {
+ throw new Exception("no new change");
+ }
}
}
@@ -80,18 +105,47 @@
*/
public boolean next()
{
- currentChange = cursor.next();
+ boolean hasNext = false;
+
+ currentChange = cursor.next(); // can return null
if (currentChange != null)
- return true;
+ {
+ lastNonNullCurrentCN = currentChange.getChangeNumber();
+ hasNext = true;
+ }
else
{
- // TODO : should check here if some changes are still in the
- // dbHandler message queue and not yet saved to the backing database
- // if yes should get change from there from now on.
- return false;
+ synchronized (this)
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+ dbh.flush();
+ try
+ {
+ cursor = db.openReadCursor(lastNonNullCurrentCN);
+ currentChange = cursor.next(); // can return null
+ lastNonNullCurrentCN = currentChange.getChangeNumber();
+ if (currentChange != null)
+ {
+ hasNext = true;
+ }
+ else
+ {
+ hasNext = false;
+ }
+ }
+ catch(Exception e)
+ {
+ currentChange = null;
+ hasNext = false;
+ }
+ }
}
-
+ return hasNext;
}
/**
@@ -108,6 +162,8 @@
cursor.close();
cursor = null;
}
+ this.dbh = null;
+ this.db = null;
}
}
--
Gitblit v1.10.0