From fa0dd51b38d4d1c2ac738232608e9a8538c92192 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 19 Aug 2014 10:06:35 +0000
Subject: [PATCH] Forward port of checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend to support cn=changelog CR-4053
---
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 10 ++
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 13 ++--
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 13 +++-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 5 +
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 32 ++++++----
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 1
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java | 23 +++++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 32 +++++++---
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 3
9 files changed, 94 insertions(+), 38 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 1b00dce..644b994 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -53,6 +53,7 @@
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -1347,12 +1348,12 @@
* @return a non null {@link DBCursor} going from oldest to newest CSN
* @throws ChangelogException
* If a database problem happened
- * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
+ * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
*/
public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
throws ChangelogException
{
- return domainDB.getCursorFrom(baseDN, startAfterServerState);
+ return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
index ad658e4..1fe0714 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -60,6 +60,29 @@
{
/**
+ * Represents a cursor key matching strategy, which allow to choose if only
+ * the exact key must be found or if any key equals or higher should match.
+ */
+ public enum KeyMatchingStrategy {
+ /** matches only if the exact key is found. */
+ EQUAL_TO_KEY,
+ /** matches if the key or a greater key is found. */
+ GREATER_THAN_OR_EQUAL_TO_KEY
+ }
+
+ /**
+ * Represents a cursor positioning strategy, which allow to choose if the start point
+ * corresponds to the record at the provided key or the record just after the provided
+ * key.
+ */
+ public enum PositionStrategy {
+ /** start point is on the matching key. */
+ ON_MATCHING_KEY,
+ /** start point is after the matching key. */
+ AFTER_MATCHING_KEY
+ }
+
+ /**
* Getter for the current record.
*
* @return The current record.
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 5728e6a..011a7c4 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -29,6 +29,7 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
@@ -92,30 +93,33 @@
void removeDomain(DN baseDN) throws ChangelogException;
/**
- * Generates a {@link DBCursor} across all the domains starting after the
+ * Generates a {@link DBCursor} across all the domains starting at or after the
* provided {@link MultiDomainServerState} for each domain.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
* cursor.
*
- * @param startAfterState
+ * @param startState
* Starting point for each domain cursor. If any {@link ServerState}
* for a domain is null, then start from the oldest CSN for each
* replicaDBs
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, ServerState)
+ * @see #getCursorFrom(DN, ServerState, PositionStrategy)
*/
- public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
throws ChangelogException;
// serverId methods
/**
* Generates a {@link DBCursor} across all the replicaDBs for the specified
- * replication domain starting after the provided {@link ServerState} for each
+ * replication domain starting at or after the provided {@link ServerState} for each
* replicaDBs.
* <p>
* When the cursor is not used anymore, client code MUST call the
@@ -124,21 +128,24 @@
*
* @param baseDN
* the replication domain baseDN
- * @param startAfterState
+ * @param startState
* Starting point for each ReplicaDB cursor. If any CSN for a
* replicaDB is null, then start from the oldest CSN for this
* replicaDB
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, int, CSN)
+ * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
throws ChangelogException;
/**
* Generates a {@link DBCursor} for one replicaDB for the specified
- * replication domain and serverId starting after the provided {@link CSN}.
+ * replication domain and serverId starting at or after the provided {@link CSN}.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
@@ -148,14 +155,17 @@
* the replication domain baseDN of the replicaDB
* @param serverId
* the serverId of the replicaDB
- * @param startAfterCSN
+ * @param startCSN
* Starting point for the ReplicaDB cursor. If the CSN is null, then
* start from the oldest CSN for this replicaDB
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
throws ChangelogException;
/**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 9e9b6df..04cfbb3 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -42,6 +42,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -335,7 +336,7 @@
}
}
- nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+ nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
nextChangeForInsertDBCursor.next();
if (newestRecord != null)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index c249a78..26c40f7 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -54,6 +54,8 @@
*/
private static final CSN NULL_CSN = new CSN(0, 0, 0);
+ private final PositionStrategy positionStrategy;
+
/**
* Builds a DomainDBCursor instance.
*
@@ -61,11 +63,15 @@
* the replication domain baseDN of this cursor
* @param domainDB
* the DB for the provided replication domain
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
*/
- public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+ public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
{
this.baseDN = baseDN;
this.domainDB = domainDB;
+ this.positionStrategy = positionStrategy;
}
/**
@@ -102,8 +108,9 @@
final Entry<Integer, CSN> pair = iter.next();
final int serverId = pair.getKey();
final CSN csn = pair.getValue();
- final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
- final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
+ final DBCursor<UpdateMsg> cursor =
+ domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
addCursor(cursor, null);
iter.remove();
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index dc23a7e..ab55875 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -36,6 +36,7 @@
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.util.Reject;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
@@ -46,6 +47,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
@@ -696,37 +698,38 @@
/** {@inheritDoc} */
@Override
- public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
- cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
+ cursor.addDomain(baseDN, startState.getServerState(baseDN));
}
return cursor;
}
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
- throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
for (int serverId : getDomainMap(baseDN).keySet())
{
// get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
cursor.addReplicaDB(serverId, lastCSN);
}
return cursor;
}
- private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
{
synchronized (registeredDomainCursors)
{
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
if (cursors == null)
{
@@ -753,15 +756,18 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
- throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
+ PositionStrategy positionStrategy) throws ChangelogException
+
{
+ Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
+ + " is not supported for the JE implementation fo changelog");
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
final DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startAfterCSN);
- final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ replicaDB.generateCursorFrom(startCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
// TODO JNR if (offlineCSN != null) ??
// What about replicas that suddenly become offline?
return new ReplicaOfflineCursor(cursor, offlineCSN);
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index cf88679..a7f067f 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -50,15 +50,21 @@
private final ConcurrentSkipListSet<DN> removeDomains =
new ConcurrentSkipListSet<DN>();
+ private final PositionStrategy positionStrategy;
+
/**
* Builds a MultiDomainDBCursor instance.
*
* @param domainDB
* the replication domain management DB
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
*/
- public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+ public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
{
this.domainDB = domainDB;
+ this.positionStrategy = positionStrategy;
}
/**
@@ -86,7 +92,7 @@
final Entry<DN, ServerState> entry = iter.next();
final DN baseDN = entry.getKey();
final ServerState serverState = entry.getValue();
- final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+ final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
addCursor(domainDBCursor, baseDN);
iter.remove();
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 23c9318..ae9b0f0 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -54,6 +54,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
* Test for ChangeNumberIndexer class. All dependencies to the changelog DB
@@ -158,7 +159,7 @@
{
MockitoAnnotations.initMocks(this);
- multiDomainCursor = new MultiDomainDBCursor(domainDB);
+ multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
initialState = new ChangelogState();
initialCookie = new MultiDomainServerState();
replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
@@ -167,8 +168,8 @@
when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
- when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
- multiDomainCursor);
+ when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
+ .thenReturn(multiDomainCursor);
}
@AfterMethod
@@ -596,15 +597,15 @@
DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
if (domainDBCursor == null)
{
- domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+ domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
domainDBCursors.put(baseDN, domainDBCursor);
multiDomainCursor.addDomain(baseDN, null);
- when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+ when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(domainDBCursor);
}
domainDBCursor.addReplicaDB(serverId, null);
- when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+ when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(replicaDBCursor);
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index da8bcd0..e5b022a 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -187,6 +187,7 @@
of(msg4, baseDN1));
}
+ @Test
public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
--
Gitblit v1.10.0