From 4fed0daa395855cd567621b0b38d405c9af254f4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Sep 2013 15:29:01 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 15 -
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 73 +++++--
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 12
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java | 82 +++++++-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java | 205 ++++++++++++----------
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 5
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java | 15 +
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 21 +-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 8
opends/commit_message.txt | 0
opends/src/server/org/opends/server/replication/common/CSNGenerator.java | 13
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 43 ++--
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 9
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 21 +
14 files changed, 306 insertions(+), 216 deletions(-)
diff --git a/opends/commit_message.txt b/opends/commit_message.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/opends/commit_message.txt
diff --git a/opends/src/server/org/opends/server/replication/common/CSNGenerator.java b/opends/src/server/org/opends/server/replication/common/CSNGenerator.java
index 49bc844..d7d0e27 100644
--- a/opends/src/server/org/opends/server/replication/common/CSNGenerator.java
+++ b/opends/src/server/org/opends/server/replication/common/CSNGenerator.java
@@ -43,13 +43,16 @@
/**
* Create a new {@link CSNGenerator}.
- * @param serverID2 id to use when creating {@link CSN}s.
- * @param timestamp time to start with.
+ *
+ * @param serverId
+ * id to use when creating {@link CSN}s.
+ * @param timestamp
+ * time to start with.
*/
- public CSNGenerator(int serverID2, long timestamp)
+ public CSNGenerator(int serverId, long timestamp)
{
this.lastTime = timestamp;
- this.serverId = serverID2;
+ this.serverId = serverId;
this.seqnum = 0;
}
@@ -161,6 +164,6 @@
for (int localServerId : state)
{
adjust(state.getCSN(localServerId));
- }
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 06070f1..525f786 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -414,16 +414,13 @@
private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
ReplicaDBCursor cursor)
{
- if (cursor != null)
+ if (cursor.getChange() != null)
{
- if (cursor.getChange() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- close(cursor);
- }
+ cursors.add(cursor);
+ }
+ else
+ {
+ close(cursor);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index c5ec6bd..9c8c0c2 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -632,34 +632,31 @@
}
ReplicaDBCursor cursor = rsd.getCursorFrom(serverId, previousCSN);
- if (cursor != null)
+ try
{
- try
- {
- int lookthroughCount = 0;
+ int lookthroughCount = 0;
- // Walk through the changes
- while (cursor.getChange() != null)
- {
- if (exportConfig != null && exportConfig.isCancelled())
- { // abort if cancelled
- return;
- }
- if (!canContinue(searchOperation, lookthroughCount))
- {
- break;
- }
- lookthroughCount++;
- writeChange(cursor.getChange(), ldifWriter, searchOperation,
- rsd.getBaseDN(), exportConfig != null);
- cursor.next();
- }
- }
- finally
+ // Walk through the changes
+ while (cursor.getChange() != null)
{
- close(cursor);
+ if (exportConfig != null && exportConfig.isCancelled())
+ { // abort if cancelled
+ return;
+ }
+ if (!canContinue(searchOperation, lookthroughCount))
+ {
+ break;
+ }
+ lookthroughCount++;
+ writeChange(cursor.getChange(), ldifWriter, searchOperation,
+ rsd.getBaseDN(), exportConfig != null);
+ cursor.next();
}
}
+ finally
+ {
+ close(cursor);
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 044ba50..d2fbc45 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1174,6 +1174,15 @@
logError(msg);
}
}
+
+ /**
+ * Removes the changelog database directory.
+ */
+ public void removeDb()
+ {
+ this.changelogDB.removeDB();
+ }
+
/**
* {@inheritDoc}
*/
@@ -1735,18 +1744,6 @@
return this.changelogDB;
}
- /**
- * Get the replication server DB directory.
- * This is useful for tests to be able to do some cleanup. Might even be
- * useful for the server some day.
- *
- * @return the Database directory name
- */
- public String getDbDirName()
- {
- return this.changelogDB.getDBDirectoryName();
- }
-
/** {@inheritDoc} */
@Override
public String toString()
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 22303e4..fe0622a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1283,16 +1283,21 @@
}
/**
- * Creates and returns a cursor. When the cursor is not used anymore, the
- * caller MUST call the {@link ReplicaDBCursor#close()} method to free the
- * resources and locks used by the cursor.
+ * Creates and returns a cursor.
+ * <p>
+ * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
+ * to the next available record.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+ * by the cursor.
*
* @param serverId
- * Identifier of the server for which the cursor is created.
+ * Identifier of the server for which the cursor is created
* @param startAfterCSN
- * Starting point for the cursor.
- * @return the created {@link ReplicaDBCursor}. Null when no DB is available
- * or the DB is empty for the provided serverId .
+ * Starting point for the cursor. If null, start from the oldest CSN
+ * @return a non null {@link ReplicaDBCursor}
+ * @see ChangelogDB#getCursorFrom(DN, int, CSN)
*/
public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
{
@@ -2198,7 +2203,7 @@
public void clearDbs()
{
// Reset the localchange and state db for the current domain
- changelogDB.clearDomain(baseDN);
+ changelogDB.removeDomain(baseDN);
try
{
localReplicationServer.clearGenerationId(baseDN);
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index 1bc8fd1..e1dc552 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -84,6 +84,11 @@
void shutdownDB();
/**
+ * Removes the changelog database directory.
+ */
+ void removeDB();
+
+ /**
* Returns a new {@link ChangeNumberIndexDB} object.
*
* @return a new {@link ChangeNumberIndexDB} object
@@ -145,7 +150,7 @@
long getDomainLatestTrimDate(DN baseDN);
/**
- * Shutdown the specified replication domain.
+ * Shutdown all the replica databases for the specified replication domain.
*
* @param baseDN
* the replication domain baseDN
@@ -153,27 +158,69 @@
void shutdownDomain(DN baseDN);
/**
- * Clear DB and shutdown for the specified replication domain.
+ * Removes all the data relating to the specified replication domain and
+ * shutdown all its replica databases. In particular, it will:
+ * <ol>
+ * <li>remove all the changes from the replica databases</li>
+ * <li>remove all knowledge of the serverIds in this domain</li>
+ * <li>remove any knowledge of the current generationId for this domain</li>
+ * </ol>
*
* @param baseDN
* the replication domain baseDN
*/
- void clearDomain(DN baseDN);
+ void removeDomain(DN baseDN);
// serverId methods
/**
- * Return the number of changes between 2 provided {@link CSN}s for the
- * specified serverId and replication domain.
+ * Return the number of changes inclusive between 2 provided {@link CSN}s for
+ * the specified serverId and replication domain. i.e. the <code>from</code>
+ * and <code>to</code> CSNs are included in the count.
+ * <p>
+ * Note that:
+ * <ol>
+ * <li>If <code>from</code> is null, the count starts at the oldest CSN in the
+ * database.</li>
+ * <li>If <code>to</code> is null, the count is 0.</li>
+ * <li>If both from and to are present, then the count includes them both
+ * <code>to</code> is null, the count ends at the newest CSN in the database.
+ * </li>
+ * <li>incidentally, if both <code>from</code> and <code>to</code> are null,
+ * the total count of entries in the replica database is returned.</li>
+ * </ol>
+ * <h6>Example</h6>
+ * <p>
+ * Given the following replica database for baseDN "dc=example,dc=com" and
+ * serverId 1:
+ *
+ * <pre>
+ * CSN1 <= Oldest
+ * CSN2
+ * CSN3
+ * CSN4
+ * CSN5 <= Newest
+ * </pre>
+ *
+ * Then:
+ *
+ * <pre>
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1);
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2);
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5);
+ * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5);
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0);
+ * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5);
+ * </pre>
*
* @param baseDN
* the replication domain baseDN
* @param serverId
* the serverId on which to act
* @param from
- * The lower (older) CSN
+ * The older CSN where to start the count
* @param to
- * The upper (newer) CSN
+ * The newer CSN where to end the count
* @return The computed number of changes
*/
long getCount(DN baseDN, int serverId, CSN from, CSN to);
@@ -188,23 +235,28 @@
* the serverId for which we want the information
* @param startAfterCSN
* The position where the iterator must start
- * @return a new ReplicationIterator that allows to browse the db managed by
- * this dbHandler and starting at the position defined by a given CSN.
+ * @return the CSN immediately after startAfterCSN, or null if no CSN exist
+ * after startAfterCSN
*/
CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN);
/**
- * Generates a non empty {@link ReplicaDBCursor} for the specified serverId
- * and replication domain.
+ * Generates a {@link ReplicaDBCursor} for the specified serverId and
+ * replication domain starting after the provided CSN.
+ * <p>
+ * The cursor is already advanced to the record after startAfterCSN.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+ * by the cursor.
*
* @param baseDN
* the replication domain baseDN
* @param serverId
- * the serverId on which to act
+ * Identifier of the server for which the cursor is created
* @param startAfterCSN
- * The position where the iterator must start
- * @return a {@link ReplicaDBCursor} if the ReplicaDB is not empty, null
- * otherwise
+ * Starting point for the cursor. If null, start from the oldest CSN
+ * @return a non null {@link ReplicaDBCursor}
*/
ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 9f57083..eeb52b2 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -263,7 +263,8 @@
* by this dbHandler and starting at the position defined by a given CSN.
*
* @param startAfterCSN
- * The position where the cursor must start.
+ * The position where the cursor must start. If null, start from the
+ * oldest CSN
* @return a new {@link ReplicaDBCursor} that allows to browse the db managed
* by this dbHandler and starting at the position defined by a given
* CSN.
@@ -634,12 +635,16 @@
}
/**
- * Set the counter writing window size (public for unit tests only).
- * @param size Size in number of record.
+ * Set the window size for writing counter records in the DB.
+ * <p>
+ * for unit tests only!!
+ *
+ * @param size
+ * window size in number of records.
*/
- public void setCounterWindowSize(int size)
+ void setCounterRecordWindowSize(int size)
{
- db.setCounterWindowSize(size);
+ db.setCounterRecordWindowSize(size);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index ab85ba2..742dd06 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -45,6 +45,7 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.Pair;
+import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -72,6 +73,37 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
+ private static final ReplicaDBCursor EMPTY_CURSOR = new ReplicaDBCursor()
+ {
+
+ @Override
+ public int compareTo(ReplicaDBCursor o)
+ {
+ if (o == null)
+ {
+ throw new NullPointerException(); // as per javadoc
+ }
+ return o == this ? 0 : -1; // equal to self, but less than all the rest
+ }
+
+ @Override
+ public boolean next()
+ {
+ return false;
+ }
+
+ @Override
+ public UpdateMsg getChange()
+ {
+ return null;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ };
+
/**
* Builds an instance of this class.
*
@@ -220,6 +252,13 @@
/** {@inheritDoc} */
@Override
+ public void removeDB()
+ {
+ StaticUtils.recursiveDelete(dbDirectory);
+ }
+
+ /** {@inheritDoc} */
+ @Override
public Set<Integer> getDomainServerIds(DN baseDN)
{
return getDomainMap(baseDN).keySet();
@@ -298,7 +337,7 @@
/** {@inheritDoc} */
@Override
- public void clearDomain(DN baseDN)
+ public void removeDomain(DN baseDN)
{
final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
synchronized (domainMap)
@@ -410,28 +449,20 @@
CSN startAfterCSN)
{
DbHandler dbHandler = getDbHandler(baseDN, serverId);
- if (dbHandler == null)
+ if (dbHandler != null)
{
- return null;
+ try
+ {
+ ReplicaDBCursor cursor = dbHandler.generateCursorFrom(startAfterCSN);
+ cursor.next();
+ return cursor;
+ }
+ catch (ChangelogException e)
+ {
+ // ignored
+ }
}
-
- ReplicaDBCursor cursor;
- try
- {
- cursor = dbHandler.generateCursorFrom(startAfterCSN);
- }
- catch (Exception e)
- {
- return null;
- }
-
- if (!cursor.next())
- {
- close(cursor);
- return null;
- }
-
- return cursor;
+ return EMPTY_CURSOR;
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index f06a43d..7b98f6b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -27,7 +27,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import org.opends.messages.Message;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -39,8 +38,8 @@
*/
public class JEReplicaDBCursor implements ReplicaDBCursor
{
- private UpdateMsg currentChange = null;
- private ReplServerDBCursor cursor = null;
+ private UpdateMsg currentChange;
+ private ReplServerDBCursor cursor;
private DbHandler dbHandler;
private ReplicationDB db;
private CSN lastNonNullCurrentCSN;
@@ -52,7 +51,8 @@
* @param db
* The db where the cursor must be created.
* @param startAfterCSN
- * The CSN after which the cursor must start.
+ * The CSN after which the cursor must start.If null, start from the
+ * oldest CSN
* @param dbHandler
* The associated DbHandler.
* @throws ChangelogException
@@ -82,10 +82,6 @@
// look again in the db
cursor = db.openReadCursor(startAfterCSN);
- if (cursor == null)
- {
- throw new ChangelogException(Message.raw("no new change"));
- }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 78a2167..d2fe3b8 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -286,7 +286,8 @@
* ReplicationServer DB.
*
* @param startCSN
- * The CSN from which the cursor must start.
+ * The CSN from which the cursor must start.If null, start from the
+ * oldest CSN
* @throws ChangelogException
* When a problem occurs or the startCSN does not exist.
* @return The ReplServerDBCursor.
@@ -1178,7 +1179,7 @@
* Set the counter writing window size (public method for unit tests only).
* @param size Size in number of record.
*/
- public void setCounterWindowSize(int size)
+ public void setCounterRecordWindowSize(int size)
{
this.counterWindowSize = size;
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index cedcebc..de7b4a9 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -27,7 +27,6 @@
*/
package org.opends.server.replication;
-import java.io.File;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -446,7 +445,7 @@
entryList = new LinkedList<DN>();
// Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
- // (in case our test created some emtries in it)
+ // (in case our test created some entries in it)
TestCaseUtils.initializeTestBackend(true);
// Check for unexpected replication config/objects left
@@ -509,7 +508,7 @@
*/
protected void removeReplicationServerDB() {
for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
- recursiveDelete(new File(DirectoryServer.getInstanceRoot(), rs.getDbDirName()));
+ rs.removeDb();
}
}
@@ -519,9 +518,8 @@
{
if (rs != null)
{
- rs.clearDb();
rs.remove();
- recursiveDelete(new File(DirectoryServer.getInstanceRoot(), rs.getDbDirName()));
+ rs.removeDb();
}
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 87cdf24..7a6c9b4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -984,12 +984,10 @@
{
callParanoiaCheck = false;
super.classCleanUp();
- String dirName = replicationServer.getDbDirName();
+ replicationServer.getChangelogDB().removeDB();
shutdown();
- recursiveDelete(new File(DirectoryServer.getInstanceRoot(), dirName));
-
paranoiaCheck();
}
@@ -1366,8 +1364,7 @@
assertTrue(b.isIndexed(filter));
List<Control> requestControls = new LinkedList<Control>();
- requestControls.add(new LDAPControl(OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE,
- false));
+ requestControls.add(new LDAPControl(OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE, false));
DN baseDN=DN.decode("dc=replicationChanges");
//Test the group membership control causes search to be skipped.
InternalSearchOperation internalSearch =
@@ -1383,7 +1380,7 @@
// is currently failing when run in the nightly target.
// anonymous search returns entries from replication backend whereas it
// should not. Probably a previous test in the nightlytests suite is
- // removing/modifying some ACIs...When problem foound, we have to re-enable
+ // removing/modifying some ACIs...When problem found, we have to re-enable
// this test.
// testReplicationBackendACIs();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
index 4ec800f..7e5e55e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -40,6 +40,7 @@
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -206,11 +207,11 @@
ReplicaDBCursor cursor = handler.generateCursorFrom(csns[0]);
try
{
+ assertNull(cursor.getChange());
for (int i = 1; i < csns.length; i++)
{
assertTrue(cursor.next());
- final CSN csn = cursor.getChange().getCSN();
- assertEquals(csn, csns[i]);
+ assertEquals(cursor.getChange().getCSN(), csns[i]);
}
assertFalse(cursor.next());
assertNull(cursor.getChange(), "Actual change=" + cursor.getChange()
@@ -230,7 +231,7 @@
cursor = handler.generateCursorFrom(csn);
fail("Expected exception");
}
- catch (Exception e)
+ catch (ChangelogException e)
{
assertEquals(e.getLocalizedMessage(), "CSN not available");
}
@@ -296,6 +297,50 @@
}
}
+ @Test
+ public void testGetCountNoCounterRecords() throws Exception
+ {
+ File testRoot = null;
+ ReplicationServer replicationServer = null;
+ ReplicationDbEnv dbEnv = null;
+ DbHandler handler = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000);
+
+ testRoot = createCleanDir();
+ dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
+ handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
+
+ CSNGenerator csnGen = new CSNGenerator(1, System.currentTimeMillis());
+ CSN[] csns = new CSN[5];
+ for (int i = 0; i < 5; i++)
+ {
+ csns[i] = csnGen.newCSN();
+ handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ }
+ handler.flush();
+
+ assertEquals(handler.getCount(csns[0], csns[0]), 1);
+ assertEquals(handler.getCount(csns[0], csns[1]), 2);
+ assertEquals(handler.getCount(csns[0], csns[4]), 5);
+ assertEquals(handler.getCount(null, csns[4]), 5);
+ assertEquals(handler.getCount(csns[0], null), 0);
+ assertEquals(handler.getCount(null, null), 5);
+ }
+ finally
+ {
+ if (handler != null)
+ handler.shutdown();
+ if (dbEnv != null)
+ dbEnv.shutdown();
+ if (replicationServer != null)
+ replicationServer.remove();
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+ }
+
/**
* Test the logic that manages counter records in the DbHandler in order to
* optimize the counting of record in the replication changelog db.
@@ -346,89 +391,59 @@
ReplicationServer replicationServer = null;
ReplicationDbEnv dbEnv = null;
DbHandler handler = null;
- long actualCnt = 0;
- String testcase;
try
{
TestCaseUtils.startServer();
-
replicationServer = configureReplicationServer(100000);
testRoot = createCleanDir();
dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
- handler.setCounterWindowSize(counterWindow);
+ handler.setCounterRecordWindowSize(counterWindow);
// Populate the db with 'max' msg
int mySeqnum = 1;
- CSN csnArray[] = new CSN[2 * (max + 1)];
+ CSN csns[] = new CSN[2 * (max + 1)];
long now = System.currentTimeMillis();
for (int i=1; i<=max; i++)
{
- csnArray[i] = new CSN(now + i, mySeqnum, 1);
+ csns[i] = new CSN(now + i, mySeqnum, 1);
mySeqnum+=2;
- DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN, csnArray[i], "uid");
+ DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN, csns[i], "uid");
handler.add(update1);
}
handler.flush();
// Test first and last
- CSN csn1 = handler.getOldestCSN();
- assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
- CSN csnLast = handler.getNewestCSN();
- assertEquals(csnLast, csnArray[max], "Wrong newest CSN");
+ CSN oldestCSN = handler.getOldestCSN();
+ assertEquals(oldestCSN, csns[1], "Wrong oldest CSN");
+ CSN newestCSN = handler.getNewestCSN();
+ assertEquals(newestCSN, csns[max], "Wrong newest CSN");
// Test count in different subcases trying to handle all special cases
// regarding the 'counter' record and 'count' algorithm
- testcase="FROM change1 TO change1 ";
- actualCnt = handler.getCount(csnArray[1], csnArray[1]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, 1, testcase);
+ assertCount(tn, handler, csns[1], csns[1], 1, "FROM change1 TO change1 ");
+ assertCount(tn, handler, csns[1], csns[2], 2, "FROM change1 TO change2 ");
+ assertCount(tn, handler, csns[1], csns[counterWindow], counterWindow,
+ "FROM change1 TO counterWindow=" + counterWindow);
- testcase="FROM change1 TO change2 ";
- actualCnt = handler.getCount(csnArray[1], csnArray[2]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, 2, testcase);
-
- testcase="FROM change1 TO counterWindow="+(counterWindow);
- actualCnt = handler.getCount(csnArray[1], csnArray[counterWindow]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, counterWindow, testcase);
-
- testcase="FROM change1 TO counterWindow+1="+(counterWindow+1);
- actualCnt = handler.getCount(csnArray[1], csnArray[counterWindow+1]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, counterWindow+1, testcase);
-
- testcase="FROM change1 TO 2*counterWindow="+(2*counterWindow);
- actualCnt = handler.getCount(csnArray[1], csnArray[2*counterWindow]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, 2*counterWindow, testcase);
-
- testcase="FROM change1 TO 2*counterWindow+1="+((2*counterWindow)+1);
- actualCnt = handler.getCount(csnArray[1], csnArray[(2*counterWindow)+1]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, (2*counterWindow)+1, testcase);
-
- testcase="FROM change2 TO change5 ";
- actualCnt = handler.getCount(csnArray[2], csnArray[5]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, 4, testcase);
-
- testcase="FROM counterWindow+2 TO counterWindow+5 ";
- actualCnt = handler.getCount(csnArray[(counterWindow+2)], csnArray[(counterWindow+5)]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, 4, testcase);
-
- testcase="FROM change2 TO counterWindow+5 ";
- actualCnt = handler.getCount(csnArray[2], csnArray[(counterWindow+5)]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, counterWindow+4, testcase);
-
- testcase="FROM counterWindow+4 TO counterWindow+4 ";
- actualCnt = handler.getCount(csnArray[(counterWindow+4)], csnArray[(counterWindow+4)]);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, 1, testcase);
+ final int j = counterWindow + 1;
+ assertCount(tn, handler, csns[1], csns[j], j,
+ "FROM change1 TO counterWindow+1=" + j);
+ final int k = 2 * counterWindow;
+ assertCount(tn, handler, csns[1], csns[k], k,
+ "FROM change1 TO 2*counterWindow=" + k);
+ final int l = k + 1;
+ assertCount(tn, handler, csns[1], csns[l], l,
+ "FROM change1 TO 2*counterWindow+1=" + l);
+ assertCount(tn, handler, csns[2], csns[5], 4,
+ "FROM change2 TO change5 ");
+ assertCount(tn, handler, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4,
+ "FROM counterWindow+2 TO counterWindow+5 ");
+ assertCount(tn, handler, csns[2], csns[(counterWindow + 5)], counterWindow + 4,
+ "FROM change2 TO counterWindow+5 ");
+ assertCount(tn, handler, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1,
+ "FROM counterWindow+4 TO counterWindow+4 ");
// Now test with changes older than first or newer than last
CSN olderThanFirst = null;
@@ -436,15 +451,10 @@
// Now we want to test with start and stop outside of the db
- testcase="FROM our first generated change TO now (> newest change in the db)";
- actualCnt = handler.getCount(csnArray[1], newerThanLast);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, max, testcase);
-
- testcase="FROM null (start of time) TO now (> newest change in the db)";
- actualCnt = handler.getCount(olderThanFirst, newerThanLast);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, max, testcase);
+ assertCount(tn, handler, csns[1], newerThanLast, max,
+ "FROM our first generated change TO now (> newest change in the db)");
+ assertCount(tn, handler, olderThanFirst, newerThanLast, max,
+ "FROM null (start of time) TO now (> newest change in the db)");
// Now we want to test that after closing and reopening the db, the
// counting algo is well reinitialized and when new messages are added
@@ -453,53 +463,47 @@
handler.shutdown();
handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
- handler.setCounterWindowSize(counterWindow);
+ handler.setCounterRecordWindowSize(counterWindow);
// Test first and last
- csn1 = handler.getOldestCSN();
- assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
- csnLast = handler.getNewestCSN();
- assertEquals(csnLast, csnArray[max], "Wrong newest CSN");
+ oldestCSN = handler.getOldestCSN();
+ assertEquals(oldestCSN, csns[1], "Wrong oldest CSN");
+ newestCSN = handler.getNewestCSN();
+ assertEquals(newestCSN, csns[max], "Wrong newest CSN");
- testcase="FROM our first generated change TO now (> newest change in the db)";
- actualCnt = handler.getCount(csnArray[1], newerThanLast);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, max, testcase);
+ assertCount(tn, handler, csns[1], newerThanLast, max,
+ "FROM our first generated change TO now (> newest change in the db)");
// Populate the db with 'max' msg
for (int i=max+1; i<=(2*max); i++)
{
- csnArray[i] = new CSN(now+i, mySeqnum, 1);
+ csns[i] = new CSN(now + i, mySeqnum, 1);
mySeqnum+=2;
- DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN, csnArray[i], "uid");
+ DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN, csns[i], "uid");
handler.add(update1);
}
handler.flush();
// Test first and last
- csn1 = handler.getOldestCSN();
- assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
- csnLast = handler.getNewestCSN();
- assertEquals(csnLast, csnArray[2 * max], "Wrong newest CSN");
+ oldestCSN = handler.getOldestCSN();
+ assertEquals(oldestCSN, csns[1], "Wrong oldest CSN");
+ newestCSN = handler.getNewestCSN();
+ assertEquals(newestCSN, csns[2 * max], "Wrong newest CSN");
- testcase="FROM our first generated change TO now (> newest change in the db)";
- actualCnt = handler.getCount(csnArray[1], newerThanLast);
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, (2*max), testcase);
+ assertCount(tn, handler, csns[1], newerThanLast, 2 * max,
+ "FROM our first generated change TO now (> newest change in the db)");
//
handler.setPurgeDelay(100);
sleep(4000);
long totalCount = handler.getCount(null, null);
- debugInfo(tn,testcase + " After purge, total count=" + totalCount);
+ debugInfo(tn, "FROM our first generated change TO now (> newest change in the db)" + " After purge, total count=" + totalCount);
- testcase="AFTER PURGE (first, last)=";
+ String testcase = "AFTER PURGE (first, last)=";
debugInfo(tn, testcase + handler.getOldestCSN() + handler.getNewestCSN());
- assertEquals(handler.getNewestCSN(), csnArray[2*max], "Newest=");
+ assertEquals(handler.getNewestCSN(), csns[2 * max], "Newest=");
- testcase="AFTER PURGE ";
- actualCnt = handler.getCount(csnArray[1], newerThanLast);
int expectedCnt;
if (totalCount>1)
{
@@ -511,8 +515,7 @@
{
expectedCnt = 0;
}
- debugInfo(tn,testcase + " actualCnt=" + actualCnt);
- assertEquals(actualCnt, expectedCnt, testcase);
+ assertCount(tn, handler, csns[1], newerThanLast, expectedCnt, "AFTER PURGE");
// Clear ...
debugInfo(tn,"clear:");
@@ -535,4 +538,12 @@
}
}
+ private void assertCount(String tn, DbHandler handler, CSN from, CSN to,
+ int expectedCount, String testcase)
+ {
+ long actualCount = handler.getCount(from, to);
+ debugInfo(tn, testcase + " actualCount=" + actualCount);
+ assertEquals(actualCount, expectedCount, testcase);
+ }
+
}
--
Gitblit v1.10.0