From 53058e19e0f41acf6303cdac234b47311556dfc5 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 28 Mar 2014 11:50:04 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3278) Re-implement changelog purging logic
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 354 ++++++++++++++++++++++----------------------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 81 ++++-----
opendj-sdk/opends/src/messages/messages/replication.properties | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 11 +
4 files changed, 227 insertions(+), 225 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 8021e90..f8bbc38 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -535,4 +535,8 @@
expected the newest change number index record CSN '%s' to be equal to \
the CSN read from the replica DBs '%s'
NOTICE_ECL_LOOKTHROUGH_LIMIT_EXCEEDED_238=This search operation has checked the \
- maximum of %d entries for matches
\ No newline at end of file
+ maximum of %d entries for matches
+SEVERE_ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB_239=Could not add \
+ change %s to replicaDB %s %s because: %s
+SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
+ change %s to replicaDB %s %s because flushing thread is shutting down
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index b85d132..4426b0f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -30,6 +30,9 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -67,13 +70,33 @@
*/
public class JEReplicaDB implements Runnable
{
+
+ /**
+ * Class that allows atomically setting oldest and newest CSNs without
+ * synchronization.
+ *
+ * @Immutable
+ */
+ private static final class CSNLimits
+ {
+ private final CSN oldestCSN;
+ private final CSN newestCSN;
+
+ public CSNLimits(CSN oldestCSN, CSN newestCSN)
+ {
+ this.oldestCSN = oldestCSN;
+ this.newestCSN = newestCSN;
+ }
+
+ }
+
/**
* The msgQueue holds all the updates not yet saved to stable storage.
* <p>
- * This list is only used as a temporary placeholder so that the write in the
- * stable storage can be grouped for efficiency reason. Adding an update
- * synchronously add the update to this list. A dedicated thread loops on
- * flush() and trim().
+ * This blocking queue is only used as a temporary placeholder so that the
+ * write in the stable storage can be grouped for efficiency reason. Adding an
+ * update synchronously add the update to this list. A dedicated thread loops
+ * on {@link #flush()} and {@link #trim()}.
* <dl>
* <dt>flush()</dt>
* <dd>get a number of changes from the in memory list by block and write them
@@ -86,37 +109,35 @@
* Changes are not read back by replicationServer threads that are responsible
* for pushing the changes to other replication server or to LDAP server
*/
- private final LinkedList<UpdateMsg> msgQueue =
- new LinkedList<UpdateMsg>();
+ private final LinkedBlockingQueue<UpdateMsg> msgQueue =
+ new LinkedBlockingQueue<UpdateMsg>();
/**
- * The High and low water mark for the max size of the msgQueue. The threads
- * calling add() method will be blocked if the size of msgQueue becomes larger
- * than the queueHimark and will resume only when the size of the msgQueue
- * goes below queueLowmark.
+ * Semaphore used to limit the number of bytes used in memory by the queue.
+ * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
+ * size of msgQueue becomes larger than the available permits and will resume
+ * only when the number of available permits allow it.
*/
- private int queueMaxSize = 5000;
- private int queueLowmark = 1000;
- private int queueHimark = 4000;
-
- /**
- * The queue himark and lowmark in bytes, this is set to 100 times the himark
- * and lowmark in number of updates.
- */
- private int queueMaxBytes = 100 * queueMaxSize;
- private int queueLowmarkBytes = 100 * queueLowmark;
- private int queueHimarkBytes = 100 * queueHimark;
-
- /** The number of bytes currently in the queue. */
- private int queueByteSize = 0;
+ private final Semaphore queueSizeBytes;
+ private final int queueMaxBytes;
private ReplicationDB db;
- private CSN oldestCSN;
- private CSN newestCSN;
+ /**
+ * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
+ *
+ * @NonNull
+ */
+ private volatile CSNLimits csnLimits;
private int serverId;
private DN baseDN;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private DirectoryThread thread;
+ /**
+ * Used to prevent race conditions between threads calling {@link #clear()}
+ * {@link #flush()} or {@link #trim()}. This can happen with the thread
+ * flushing the queue, on shutdown or on cursor opening, a thread calling
+ * clear(), etc.
+ */
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
@@ -146,16 +167,10 @@
this.serverId = serverId;
this.baseDN = baseDN;
trimAge = replicationServer.getTrimAge();
- final int queueSize = replicationServer.getQueueSize();
- queueMaxSize = queueSize;
- queueLowmark = queueSize / 5;
- queueHimark = queueSize * 4 / 5;
- queueMaxBytes = 200 * queueMaxSize;
- queueLowmarkBytes = 200 * queueLowmark;
- queueHimarkBytes = 200 * queueLowmark;
+ queueMaxBytes = replicationServer.getQueueSize() * 200;
+ queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
- oldestCSN = db.readOldestCSN();
- newestCSN = db.readNewestCSN();
+ csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
thread = new DirectoryThread(this, "Replication server RS("
+ replicationServer.getServerId()
+ ") changelog checkpointer for Replica DS(" + serverId
@@ -167,61 +182,72 @@
}
/**
- * Add an update to the list of messages that must be saved to the db
- * managed by this db handler.
- * This method is blocking if the size of the list of message is larger
- * than its maximum.
+ * Add an update to the list of messages that must be saved to the db managed
+ * by this db handler. This method is blocking if the size of the list of
+ * message is larger than its maximum.
*
- * @param update The update that must be saved to the db managed by this db
- * handler.
+ * @param updateMsg
+ * The update message that must be saved to the db managed by this db
+ * handler.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void add(UpdateMsg update)
+ public void add(UpdateMsg updateMsg) throws ChangelogException
{
- synchronized (msgQueue)
+ if (thread.isShutdownInitiated())
{
- int size = msgQueue.size();
- if (size > queueHimark || queueByteSize > queueHimarkBytes)
- {
- msgQueue.notify();
- }
+ throw new ChangelogException(
+ ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
+ .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
+ }
- while (size > queueMaxSize || queueByteSize > queueMaxBytes)
+ final int msgSize = updateMsg.size();
+ if (msgSize < queueMaxBytes)
+ {
+ try
{
- try
- {
- msgQueue.wait(500);
- } catch (InterruptedException e)
- {
- // simply loop to try again.
- }
- size = msgQueue.size();
+ queueSizeBytes.acquire(msgSize);
}
+ catch (InterruptedException e)
+ {
+ throw new ChangelogException(
+ ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
+ .toString(), String.valueOf(baseDN), String.valueOf(serverId),
+ stackTraceToSingleLineString(e)));
+ }
+ }
+ else
+ {
+ // edge case with a very large message
+ collectAllPermits();
+ }
+ msgQueue.add(updateMsg);
- queueByteSize += update.size();
- msgQueue.add(update);
- if (newestCSN == null || newestCSN.isOlderThan(update.getCSN()))
- {
- newestCSN = update.getCSN();
- }
- if (oldestCSN == null)
- {
- oldestCSN = update.getCSN();
- }
+ final CSNLimits limits = csnLimits;
+ final boolean updateNew = limits.newestCSN == null
+ || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+ final boolean updateOld = limits.oldestCSN == null;
+ if (updateOld || updateNew)
+ {
+ csnLimits = new CSNLimits(
+ updateOld ? updateMsg.getCSN() : limits.oldestCSN,
+ updateNew ? updateMsg.getCSN() : limits.newestCSN);
}
}
- /**
- * Get some changes out of the message queue of the LDAP server.
- * (from the beginning of the queue)
- * @param number the maximum number of messages to extract.
- * @return a List containing number changes extracted from the queue.
- */
- private List<UpdateMsg> getChanges(int number)
+ /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
+ private void collectAllPermits()
{
- synchronized (msgQueue)
+ int collectedPermits = queueSizeBytes.drainPermits();
+ while (collectedPermits != queueMaxBytes)
{
- final int minAvailableNb = Math.min(number, msgQueue.size());
- return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb));
+ // Do not use Thread.sleep() because:
+ // 1) it is expected the permits will be released very soon
+ // 2) we want to collect all the permits, so do not leave a chance to
+ // other threads to steal them from us.
+ // 3) we want to keep low latency
+ Thread.yield();
+ collectedPermits += queueSizeBytes.drainPermits();
}
}
@@ -232,7 +258,7 @@
*/
public CSN getOldestCSN()
{
- return oldestCSN;
+ return csnLimits.oldestCSN;
}
/**
@@ -242,7 +268,7 @@
*/
public CSN getNewestCSN()
{
- return newestCSN;
+ return csnLimits.newestCSN;
}
/**
@@ -252,9 +278,10 @@
*/
public long getChangesCount()
{
- if (newestCSN != null && oldestCSN != null)
+ final CSNLimits limits = csnLimits;
+ if (limits.newestCSN != null && limits.oldestCSN != null)
{
- return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
+ return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
}
return 0;
}
@@ -276,36 +303,13 @@
{
if (startAfterCSN == null)
{
+ // flush any potential changes before opening the cursor
flush();
}
return new JEReplicaDBCursor(db, startAfterCSN, this);
}
/**
- * Removes the provided number of messages from the beginning of the msgQueue.
- *
- * @param number the number of changes to be removed.
- */
- private void clearQueue(int number)
- {
- synchronized (msgQueue)
- {
- int current = 0;
- while (current < number && !msgQueue.isEmpty())
- {
- UpdateMsg msg = msgQueue.remove(); // remove first
- queueByteSize -= msg.size();
- current++;
- }
- if (msgQueue.size() < queueLowmark
- && queueByteSize < queueLowmarkBytes)
- {
- msgQueue.notifyAll();
- }
- }
- }
-
- /**
* Shutdown this ReplicaDB.
*/
public void shutdown()
@@ -317,11 +321,6 @@
thread.initiateShutdown();
- synchronized (msgQueue)
- {
- msgQueue.notifyAll();
- }
-
while (msgQueue.size() != 0)
{
try
@@ -331,8 +330,7 @@
catch (ChangelogException e)
{
// We are already shutting down
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
+ logError(e.getMessageObject());
}
}
@@ -358,27 +356,8 @@
{
flush();
trim();
-
- synchronized (msgQueue)
- {
- if (msgQueue.size() < queueLowmark
- && queueByteSize < queueLowmarkBytes)
- {
- try
- {
- msgQueue.wait(1000);
- }
- catch (InterruptedException e)
- {
- // Do not reset the interrupt flag here,
- // because otherwise JE will barf next time flush() is called:
- // JE 5.0.97 refuses to persist changes to the DB when invoked
- // from a Thread with the interrupt flag set to true.
- }
- }
- }
}
- catch (Exception end)
+ catch (ChangelogException end)
{
stop(end);
break;
@@ -400,11 +379,6 @@
{
thread.stopWork();
}
-
- synchronized (this)
- {
- notifyAll();
- }
}
private void stop(Exception e)
@@ -453,13 +427,34 @@
trimDate = lastBeforeTrimDate;
}
+ final int queueLowMarkBytes = queueMaxBytes / 5;
for (int i = 0; i < 100; i++)
{
+ /*
+ * Perform at least some trimming regardless of the flush backlog. Then
+ * continue trim iterations while the flush backlog is low (below the
+ * lowmark). Once the flush backlog increases, stop trimming and start
+ * flushing more eagerly.
+ */
+ if (i > 20 && msgQueue.size() < queueLowMarkBytes)
+ {
+ break;
+ }
+
synchronized (flushLock)
{
/*
- * the trim is done by group in order to save some CPU and IO bandwidth
- * start the transaction then do a bunch of remove then commit
+ * the trim is done by group in order to save some CPU, IO bandwidth and
+ * DB caches: start the transaction then do a bunch of remove then
+ * commit.
+ */
+ /*
+ * Matt wrote: The record removal is done as a DB transaction and the
+ * deleted records are only "deleted" on commit. While the txn/cursor is
+ * open the records to be deleted will, I think, be pinned in the DB
+ * cache. In other words, the larger the transaction (the more records
+ * deleted during a single batch) the more DB cache will be used to
+ * process the transaction.
*/
final ReplServerDBCursor cursor = db.openDeleteCursor();
try
@@ -477,13 +472,13 @@
return;
}
- if (!csn.equals(newestCSN) && csn.isOlderThan(trimDate))
+ if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
{
cursor.delete();
}
else
{
- oldestCSN = csn;
+ csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
return;
}
}
@@ -515,32 +510,33 @@
*/
public void flush() throws ChangelogException
{
- int size;
- int chunksize = Math.min(queueMaxSize, 500);
-
- do
+ try
{
- synchronized(flushLock)
+ synchronized (flushLock)
{
- // get N (or less) messages from the queue to save to the DB
- // (from the beginning of the queue)
- List<UpdateMsg> changes = getChanges(chunksize);
-
- // if no more changes to save exit immediately.
- if (changes == null || (size = changes.size()) == 0)
+ final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
+ final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+ if (change == null)
{
+ // nothing to persist, move on to the trim phase
return;
}
- // save the change to the stable storage.
- db.addEntries(changes);
+ // Try to see if there are more changes and persist them all.
+ changes.add(change);
+ msgQueue.drainTo(changes);
- // remove the changes from the list of changes to be saved
- // (remove from the beginning of the queue)
- clearQueue(changes.size());
+ int totalSize = db.addEntries(changes);
+ // do not release more than queue max size permits
+ // (be careful of the edge case with the very large message)
+ queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
}
- // loop while there are more changes in the queue
- } while (size == chunksize);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(e)));
+ }
}
/**
@@ -556,25 +552,28 @@
public List<Attribute> getMonitorData()
{
List<Attribute> attributes = new ArrayList<Attribute>();
- attributes.add(Attributes.create("replicationServer-database",
- String.valueOf(serverId)));
- attributes.add(Attributes.create("domain-name",
- baseDN.toNormalizedString()));
- if (oldestCSN != null)
+ create(attributes, "replicationServer-database", String.valueOf(serverId));
+ create(attributes, "domain-name", baseDN.toNormalizedString());
+ final CSNLimits limits = csnLimits;
+ if (limits.oldestCSN != null)
{
- attributes.add(Attributes.create("first-change", encode(oldestCSN)));
+ create(attributes, "first-change", encode(limits.oldestCSN));
}
- if (newestCSN != null)
+ if (limits.newestCSN != null)
{
- attributes.add(Attributes.create("last-change", encode(newestCSN)));
+ create(attributes, "last-change", encode(limits.newestCSN));
}
- attributes.add(
- Attributes.create("queue-size", String.valueOf(msgQueue.size())));
- attributes.add(
- Attributes.create("queue-size-bytes", String.valueOf(queueByteSize)));
+ create(attributes, "queue-size", String.valueOf(msgQueue.size()));
+ create(attributes, "queue-size-bytes",
+ String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
return attributes;
}
+ private void create(List<Attribute> attributes, String name, String value)
+ {
+ attributes.add(Attributes.create(name, value));
+ }
+
private String encode(CSN csn)
{
return csn + " " + new Date(csn.getTime());
@@ -609,8 +608,9 @@
@Override
public String toString()
{
+ final CSNLimits limits = csnLimits;
return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
- + oldestCSN + " " + newestCSN;
+ + limits.oldestCSN + " " + limits.newestCSN;
}
/**
@@ -631,12 +631,10 @@
{
synchronized(flushLock)
{
+ collectAllPermits();
msgQueue.clear();
- queueByteSize = 0;
-
db.clear();
- oldestCSN = db.readOldestCSN();
- newestCSN = db.readNewestCSN();
+ csnLimits = new CSNLimits(null, null);
}
}
@@ -655,7 +653,7 @@
* For test purpose.
* @return The memory queue size.
*/
- public int getQueueSize()
+ int getQueueSize()
{
return this.msgQueue.size();
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index afe3adc..f755ec8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -186,10 +186,11 @@
*
* @param changes
* The list of changes to add to the underlying db.
+ * @return the total size of all the changes
* @throws ChangelogException
* If a database problem happened
*/
- public void addEntries(List<UpdateMsg> changes) throws ChangelogException
+ public int addEntries(List<UpdateMsg> changes) throws ChangelogException
{
dbCloseLock.readLock().lock();
try
@@ -197,9 +198,10 @@
// If the DB has been closed then return immediately.
if (isDBClosed())
{
- return;
+ return 0;
}
+ int totalSize = 0;
for (UpdateMsg change : changes)
{
final DatabaseEntry key = createReplicationKey(change.getCSN());
@@ -208,7 +210,10 @@
insertCounterRecordIfNeeded(change.getCSN());
db.put(null, key, data);
counterCurrValue++;
+
+ totalSize += change.size();
}
+ return totalSize;
}
catch (DatabaseException e)
{
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 38704df..e17878a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -86,7 +86,7 @@
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
- JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+ final JEReplicaDB replicaDB = newReplicaDB(replicationServer);
CSN[] csns = newCSNs(1, 0, 5);
@@ -96,20 +96,8 @@
DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
//--
- // Iterator tests with memory queue only populated
-
- // verify that memory queue is populated
- assertEquals(replicaDB.getQueueSize(), 3);
-
- assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
- assertNotFound(replicaDB, csns[4]);
-
- //--
- // Iterator tests with db only populated
- Thread.sleep(1000); // let the time for flush to happen
-
- // verify that memory queue is empty (all changes flushed in the db)
- assertEquals(replicaDB.getQueueSize(), 0);
+ // Iterator tests with changes persisted
+ waitChangesArePersisted(replicaDB);
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
assertNotFound(replicaDB, csns[4]);
@@ -118,36 +106,34 @@
assertEquals(replicaDB.getNewestCSN(), csns[2]);
//--
- // Cursor tests with db and memory queue populated
- // all changes in the db - add one in the memory queue
+ // Cursor tests with changes persisted
replicaDB.add(update4);
-
- // verify memory queue contains this one
- assertEquals(replicaDB.getQueueSize(), 1);
+ waitChangesArePersisted(replicaDB);
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
- // Test cursor from existing CSN at the limit between queue and db
+ // Test cursor from existing CSN
assertFoundInOrder(replicaDB, csns[2], csns[3]);
assertFoundInOrder(replicaDB, csns[3]);
assertNotFound(replicaDB, csns[4]);
replicaDB.setPurgeDelay(1);
- boolean purged = false;
- int count = 300; // wait at most 60 seconds
- while (!purged && (count > 0))
+ int count = 0;
+ boolean purgeSucceeded = false;
+ final CSN expectedNewestCSN = csns[3];
+ do
{
- CSN oldestCSN = replicaDB.getOldestCSN();
- CSN newestCSN = replicaDB.getNewestCSN();
- if (!oldestCSN.equals(csns[3]) || !newestCSN.equals(csns[3]))
- {
- TestCaseUtils.sleep(100);
- } else
- {
- purged = true;
- }
+ Thread.sleep(10);
+
+ final CSN oldestCSN = replicaDB.getOldestCSN();
+ final CSN newestCSN = replicaDB.getNewestCSN();
+ purgeSucceeded =
+ oldestCSN.equals(expectedNewestCSN)
+ && newestCSN.equals(expectedNewestCSN);
+ count++;
}
- // FIXME should add an assert here
+ while (!purgeSucceeded && count < 100);
+ assertTrue(purgeSucceeded);
}
finally
{
@@ -155,6 +141,18 @@
}
}
+ private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception
+ {
+ final int expected = 0;
+ int count = 0;
+ while (replicaDB.getQueueSize() != expected && count < 100)
+ {
+ Thread.sleep(10);
+ count++;
+ }
+ assertEquals(replicaDB.getQueueSize(), expected);
+ }
+
static CSN[] newCSNs(int serverId, long timestamp, int number)
{
CSNGenerator gen = new CSNGenerator(serverId, timestamp);
@@ -175,10 +173,10 @@
return new ReplicationServer(conf);
}
- private JEReplicaDB newReplicaDB(ReplicationServer replicationServer) throws Exception
+ private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
{
- JEChangelogDB changelogDB = (JEChangelogDB) replicationServer.getChangelogDB();
- return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, replicationServer).getFirst();
+ final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
+ return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
}
private File createCleanDir() throws IOException
@@ -251,22 +249,19 @@
CSN[] csns = newCSNs(1, 0, 3);
- // Add the changes
+ // Add the changes and check they are here
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
- // Check they are here
assertEquals(csns[0], replicaDB.getOldestCSN());
assertEquals(csns[2], replicaDB.getNewestCSN());
- // Clear ...
+ // Clear DB and check it is cleared.
replicaDB.clear();
- // Check the db is cleared.
assertEquals(null, replicaDB.getOldestCSN());
assertEquals(null, replicaDB.getNewestCSN());
-
}
finally
{
--
Gitblit v1.10.0