From 137d1b4ba1992acdd880b61b1a03dc31f0cc1839 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Apr 2014 14:19:01 +0000
Subject: [PATCH] OPENDJ-1448 Remove JReplicaDB flushing thread and msg queue
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 13 -
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java | 16 +-
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 11 -
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 19 ++
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 213 +-----------------------------
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 44 +-----
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 29 ----
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 34 ++--
8 files changed, 65 insertions(+), 314 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index f702e8b..51abace 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -66,8 +66,6 @@
return csn;
}
-
-
/**
* Creates a message from a provided byte array.
*
@@ -113,11 +111,7 @@
}
}
-
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
@@ -141,4 +135,10 @@
}
}
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 6ff04ad..a765e8c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -947,17 +947,6 @@
}
/**
- * Get the queueSize for this replication server.
- *
- * @return The maximum size of the queues for this Replication Server
- *
- */
- public int getQueueSize()
- {
- return this.config.getQueueSize();
- }
-
- /**
* Creates the backend associated to this replication server.
*/
private void createBackend() throws ConfigException
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 6c22e5c..fcf0ad4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -62,8 +62,6 @@
private static int NO_KEY = 0;
private DraftCNDB db;
- /** FIXME What is this field used for? */
- private volatile long oldestChangeNumber = NO_KEY;
/**
* The newest changenumber stored in the DB. It is used to avoid purging the
* record with the newest changenumber. The newest record in the changenumber
@@ -96,14 +94,11 @@
public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
{
db = new DraftCNDB(dbEnv);
- final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
- oldestChangeNumber = getChangeNumber(oldestRecord);
- final long newestCN = getChangeNumber(newestRecord);
- newestChangeNumber = newestCN;
+ newestChangeNumber = getChangeNumber(newestRecord);
// initialization of the lastGeneratedChangeNumber from the DB content
// if DB is empty => last record does not exist => default to 0
- lastGeneratedChangeNumber = new AtomicLong(newestCN);
+ lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
// Monitoring registration
DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -117,7 +112,7 @@
{
return record.getChangeNumber();
}
- return 0;
+ return NO_KEY;
}
/** {@inheritDoc} */
@@ -198,19 +193,11 @@
*/
public void shutdown()
{
- if (shutdown.get())
+ if (shutdown.compareAndSet(false, true))
{
- return;
+ db.shutdown();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
-
- shutdown.set(true);
- synchronized (this)
- {
- notifyAll();
- }
-
- db.shutdown();
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
/**
@@ -236,11 +223,6 @@
while (!mustShutdown(shutdown) && cursor.next())
{
final ChangeNumberIndexRecord record = cursor.currentRecord();
- if (record.getChangeNumber() != oldestChangeNumber)
- {
- oldestChangeNumber = record.getChangeNumber();
- }
-
if (record.getChangeNumber() != newestChangeNumber
&& record.getCSN().isOlderThan(purgeCSN))
{
@@ -293,14 +275,9 @@
final DraftCNDBCursor cursor = db.openDeleteCursor();
try
{
- boolean isOldestRecord = true;
while (!mustShutdown(shutdown) && cursor.next())
{
final ChangeNumberIndexRecord record = cursor.currentRecord();
- if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
- {
- oldestChangeNumber = record.getChangeNumber();
- }
if (record.getChangeNumber() == newestChangeNumber)
{
// do not purge the newest record to avoid having the last generated
@@ -312,10 +289,6 @@
{
cursor.delete();
}
- else
- {
- isOldestRecord = false;
- }
}
}
catch (ChangelogException e)
@@ -398,7 +371,7 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + ": " + oldestChangeNumber + " "
+ return getClass().getSimpleName() + ", newestChangeNumber="
+ newestChangeNumber;
}
@@ -411,8 +384,7 @@
public void clear() throws ChangelogException
{
db.clear();
- oldestChangeNumber = getChangeNumber(db.readFirstRecord());
- newestChangeNumber = getChangeNumber(db.readLastRecord());
+ newestChangeNumber = NO_KEY;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index b1e2d63..dd337e5 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -385,11 +385,13 @@
if (indexer != null)
{
indexer.initiateShutdown();
+ indexer.interrupt();
}
final ChangelogDBPurger purger = cnPurger.getAndSet(null);
if (purger != null)
{
purger.initiateShutdown();
+ purger.interrupt();
}
try
@@ -417,6 +419,23 @@
if (dbEnv != null)
{
+ // wait for shutdown of the threads holding cursors
+ try
+ {
+ if (indexer != null)
+ {
+ indexer.join();
+ }
+ if (purger != null)
+ {
+ purger.join();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing: we are already shutting down
+ }
+
dbEnv.shutdown();
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index bbde1d1..19406bd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@
import java.util.ArrayList;
import java.util.Date;
-import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
@@ -52,8 +48,6 @@
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.util.StaticUtils.*;
/**
* This class is used for managing the replicationServer database for each
@@ -67,7 +61,7 @@
* <p>
* This class publish some monitoring information below cn=monitor.
*/
-public class JEReplicaDB implements Runnable
+public class JEReplicaDB
{
/**
@@ -89,29 +83,7 @@
}
- /**
- * The msgQueue holds all the updates not yet saved to stable storage.
- * <p>
- * This blocking queue is only used as a temporary placeholder so that the
- * write in the stable storage can be grouped for efficiency reason. Adding an
- * update synchronously add the update to this list. A dedicated thread
- * flushes this blocking queue.
- * <p>
- * Changes are not read back by replicationServer threads that are responsible
- * for pushing the changes to other replication server or to LDAP server
- */
- private final LinkedBlockingQueue<UpdateMsg> msgQueue =
- new LinkedBlockingQueue<UpdateMsg>();
-
- /**
- * Semaphore used to limit the number of bytes used in memory by the queue.
- * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
- * size of msgQueue becomes larger than the available permits and will resume
- * only when the number of available permits allow it.
- */
- private final Semaphore queueSizeBytes;
- private final int queueMaxBytes;
-
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
private ReplicationDB db;
/**
* Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -122,12 +94,6 @@
private int serverId;
private DN baseDN;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private DirectoryThread thread;
- /**
- * Used to prevent race conditions between threads calling {@link #flush()}.
- * This can happen with the thread flushing the queue, or else on shutdown.
- */
- private final Object flushLock = new Object();
private ReplicationServer replicationServer;
/**
@@ -147,15 +113,8 @@
this.replicationServer = replicationServer;
this.serverId = serverId;
this.baseDN = baseDN;
- queueMaxBytes = replicationServer.getQueueSize() * 200;
- queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
- thread = new DirectoryThread(this, "Replication server RS("
- + replicationServer.getServerId()
- + ") flusher thread for Replica DS(" + serverId
- + ") for domain \"" + baseDN + "\"");
- thread.start();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -174,34 +133,14 @@
*/
public void add(UpdateMsg updateMsg) throws ChangelogException
{
- if (thread.isShutdownInitiated())
+ if (shutdown.get())
{
throw new ChangelogException(
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
.toString(), String.valueOf(baseDN), String.valueOf(serverId)));
}
- final int msgSize = updateMsg.size();
- if (msgSize < queueMaxBytes)
- {
- try
- {
- queueSizeBytes.acquire(msgSize);
- }
- catch (InterruptedException e)
- {
- throw new ChangelogException(
- ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
- .toString(), String.valueOf(baseDN), String.valueOf(serverId),
- stackTraceToSingleLineString(e)));
- }
- }
- else
- {
- // edge case with a very large message
- collectAllPermits();
- }
- msgQueue.add(updateMsg);
+ db.addEntry(updateMsg);
final CSNLimits limits = csnLimits;
final boolean updateNew = limits.newestCSN == null
@@ -215,22 +154,6 @@
}
}
- /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
- private void collectAllPermits()
- {
- int collectedPermits = queueSizeBytes.drainPermits();
- while (collectedPermits != queueMaxBytes)
- {
- // Do not use Thread.sleep() because:
- // 1) it is expected the permits will be released very soon
- // 2) we want to collect all the permits, so do not leave a chance to
- // other threads to steal them from us.
- // 3) we want to keep low latency
- Thread.yield();
- collectedPermits += queueSizeBytes.drainPermits();
- }
- }
-
/**
* Get the oldest CSN that has not been purged yet.
*
@@ -289,80 +212,10 @@
*/
public void shutdown()
{
- if (thread.isShutdownInitiated())
+ if (shutdown.compareAndSet(false, true))
{
- return;
- }
-
- thread.initiateShutdown();
-
- while (msgQueue.size() != 0)
- {
- try
- {
- flush();
- }
- catch (ChangelogException e)
- {
- // We are already shutting down
- logError(e.getMessageObject());
- }
- }
-
- db.shutdown();
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- }
-
- /**
- * Flushes the replicaDB queue from memory to stable storage.
- */
- @Override
- public void run()
- {
- thread.startWork();
-
- try
- {
- while (!thread.isShutdownInitiated())
- {
- try
- {
- flush();
- }
- catch (ChangelogException end)
- {
- stop(end);
- break;
- }
- }
-
- try
- {
- // call flush a last time before exiting to make sure that
- // no change was forgotten in the msgQueue
- flush();
- }
- catch (ChangelogException e)
- {
- stop(e);
- }
- }
- finally
- {
- thread.stopWork();
- }
- }
-
- private void stop(Exception e)
- {
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
-
- thread.initiateShutdown();
-
- if (replicationServer != null)
- {
- replicationServer.shutdown();
+ db.shutdown();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -401,7 +254,7 @@
{
for (int j = 0; j < 50; j++)
{
- if (thread.isShutdownInitiated())
+ if (shutdown.get())
{
return;
}
@@ -428,7 +281,7 @@
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- thread.initiateShutdown();
+ shutdown.set(true);
throw e;
}
finally
@@ -439,47 +292,6 @@
}
/**
- * Flush a number of updates from the memory list to the stable storage.
- * <p>
- * Flush is done by chunk sized to 500 messages, starting from the beginning
- * of the list.
- * <p>
- * @GuardedBy("flushLock")
- * @throws ChangelogException
- * If a database problem happened
- */
- private void flush() throws ChangelogException
- {
- try
- {
- synchronized (flushLock)
- {
- final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
- if (change == null)
- {
- // nothing to persist, check if shutdown was invoked
- return;
- }
-
- // Try to see if there are more changes and persist them all.
- final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
- changes.add(change);
- msgQueue.drainTo(changes);
-
- int totalSize = db.addEntries(changes);
- // do not release more than queue max size permits
- // (be careful of the edge case with the very large message)
- queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
- }
- }
- catch (InterruptedException e)
- {
- throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
- }
- }
-
- /**
* This internal class is used to implement the Monitoring capabilities of the
* ReplicaDB.
*/
@@ -501,9 +313,6 @@
{
create(attributes, "last-change", encode(limits.newestCSN));
}
- create(attributes, "queue-size", String.valueOf(msgQueue.size()));
- create(attributes, "queue-size-bytes",
- String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
return attributes;
}
@@ -552,8 +361,6 @@
*/
public void clear() throws ChangelogException
{
- collectAllPermits();
- msgQueue.clear(); // this call should not do anything at all
db.clear();
csnLimits = new CSNLimits(null, null);
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 818fba5..a1dd6ec 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -28,7 +28,6 @@
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
-import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -182,15 +181,14 @@
}
/**
- * add a list of changes to the underlying db.
+ * add one change to the underlying db.
*
- * @param changes
- * The list of changes to add to the underlying db.
- * @return the total size of all the changes
+ * @param change
+ * The change to add to the underlying db.
* @throws ChangelogException
* If a database problem happened
*/
- public int addEntries(List<UpdateMsg> changes) throws ChangelogException
+ public void addEntry(UpdateMsg change) throws ChangelogException
{
dbCloseLock.readLock().lock();
try
@@ -198,26 +196,22 @@
// If the DB has been closed then return immediately.
if (isDBClosed())
{
- return 0;
+ return;
}
- int totalSize = 0;
- for (UpdateMsg change : changes)
- {
- final DatabaseEntry key = createReplicationKey(change.getCSN());
- final DatabaseEntry data = new ReplicationData(change);
+ final DatabaseEntry key = createReplicationKey(change.getCSN());
+ final DatabaseEntry data = new ReplicationData(change);
- insertCounterRecordIfNeeded(change.getCSN());
- db.put(null, key, data);
- counterCurrValue++;
-
- totalSize += change.size();
- }
- return totalSize;
+ insertCounterRecordIfNeeded(change.getCSN());
+ db.put(null, key, data);
+ counterCurrValue++;
}
catch (DatabaseException e)
{
- throw new ChangelogException(e);
+ throw new ChangelogException(
+ ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(
+ change.toString(), String.valueOf(baseDN),
+ String.valueOf(serverId), stackTraceToSingleLineString(e)));
}
finally
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 10aad5f..788087b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -48,8 +48,7 @@
import static org.testng.Assert.*;
/**
- * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : -
- * periodic trim - call to clear method()
+ * Test the JEChangeNumberIndexDB class.
*/
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
@@ -71,13 +70,13 @@
* <li>create the db</li>
* <li>add records</li>
* <li>read them with a cursor</li>
- * <li>set a very short trim period</li>
- * <li>wait for the db to be trimmed / here since the changes are not stored
+ * <li>set a very short purge period</li>
+ * <li>wait for the db to be purged / here since the changes are not stored
* in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
* </ol>
*/
@Test
- void testPurge() throws Exception
+ public void testPurge() throws Exception
{
ReplicationServer replicationServer = null;
try
@@ -172,8 +171,8 @@
* <li>clear the db</li>
* </ol>
*/
- @Test()
- void testClear() throws Exception
+ @Test
+ public void testClear() throws Exception
{
ReplicationServer replicationServer = null;
try
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 851afa2..e431d97 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,8 +97,6 @@
//--
// Iterator tests with changes persisted
- waitChangesArePersisted(replicaDB, 3);
-
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
assertNotFound(replicaDB, csns[4]);
@@ -108,7 +106,6 @@
//--
// Cursor tests with changes persisted
replicaDB.add(update4);
- waitChangesArePersisted(replicaDB, 4);
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
// Test cursor from existing CSN
@@ -141,29 +138,6 @@
}
}
- private void waitChangesArePersisted(JEReplicaDB replicaDB,
- int nbRecordsInserted) throws Exception
- {
- waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
- }
-
- private void waitChangesArePersisted(JEReplicaDB replicaDB,
- int nbRecordsInserted, int counterWindow) throws Exception
- {
- // one counter record is inserted every time "counterWindow"
- // records have been inserted
- int expectedNbRecords =
- nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
-
- int count = 0;
- while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
- {
- Thread.sleep(10);
- count++;
- }
- assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
- }
-
static CSN[] newCSNs(int serverId, long timestamp, int number)
{
CSNGenerator gen = new CSNGenerator(serverId, timestamp);
@@ -301,7 +275,6 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
}
}
- waitChangesArePersisted(replicaDB, 4);
cursor = replicaDB.generateCursorFrom(csns[0]);
assertTrue(cursor.next());
@@ -378,7 +351,6 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
mySeqnum+=2;
}
- waitChangesArePersisted(replicaDB, max, counterWindow);
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -402,7 +374,6 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
mySeqnum+=2;
}
- waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
--
Gitblit v1.10.0