From e91b3e8091cb7c814de5b5af9d1a47fbfa2d4ca0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 18 Aug 2014 15:26:23 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 123 ++++
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 115 ++-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 127 ++++
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 418 +++++----------
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 265 +++++----
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 30
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 37 +
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 365 +++----------
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 140 +++--
9 files changed, 847 insertions(+), 773 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index ad2ceb1..5728e6a 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,10 @@
package org.opends.server.replication.server.changelog.api;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
/**
@@ -89,6 +91,26 @@
*/
void removeDomain(DN baseDN) throws ChangelogException;
+ /**
+ * Generates a {@link DBCursor} across all the domains starting after the
+ * provided {@link MultiDomainServerState} for each domain.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link DBCursor#close()} method to free the resources and locks used by the
+ * cursor.
+ *
+ * @param startAfterState
+ * Starting point for each domain cursor. If any {@link ServerState}
+ * for a domain is null, then start from the oldest CSN for each
+ * replicaDBs
+ * @return a non null {@link DBCursor}
+ * @throws ChangelogException
+ * If a database problem happened
+ * @see #getCursorFrom(DN, ServerState)
+ */
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+ throws ChangelogException;
+
// serverId methods
/**
@@ -102,16 +124,17 @@
*
* @param baseDN
* the replication domain baseDN
- * @param startAfterServerState
+ * @param startAfterState
* Starting point for each ReplicaDB cursor. If any CSN for a
* replicaDB is null, then start from the oldest CSN for this
* replicaDB
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
+ * @see #getCursorFrom(DN, int, CSN)
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
- ServerState startAfterServerState) throws ChangelogException;
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+ throws ChangelogException;
/**
* Generates a {@link DBCursor} for one replicaDB for the specified
@@ -136,6 +159,14 @@
throws ChangelogException;
/**
+ * Unregisters the provided cursor from this replication domain.
+ *
+ * @param cursor
+ * the cursor to unregister.
+ */
+ void unregisterCursor(DBCursor<?> cursor);
+
+ /**
* Publishes the provided change to the changelog DB for the specified
* serverId and replication domain. After a change has been successfully
* published, it becomes available to be returned by the External ChangeLog.
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index ea4f86a..9e9b6df 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,15 +25,8 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -42,18 +35,15 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
-import org.opends.server.util.StaticUtils;
-
-import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.*;
@@ -82,7 +72,7 @@
private ChangelogState changelogState;
/*
- * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
+ * The following MultiDomainServerState fields must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
* updates 2) many updates can happen concurrently.
*/
@@ -128,39 +118,7 @@
*
* @NonNull
*/
- @SuppressWarnings("unchecked")
- private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
- new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
-
- /**
- * New cursors for this Map must be created from the {@link #run()} method,
- * i.e. from the same thread that will make use of them. If this rule is not
- * obeyed, then a JE exception will be thrown about
- * "Non-transactional Cursors may not be used in multiple threads;".
- */
- private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
- new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
- /**
- * Holds the newCursors that will have to be created in the next iteration
- * inside the {@link #run()} method.
- * <p>
- * This map can be updated by multiple threads.
- */
- private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
- new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
- new Comparator<Pair<DN, Integer>>()
- {
- @Override
- public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
- {
- final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
- if (compareBaseDN == 0)
- {
- return o1.getSecond().compareTo(o2.getSecond());
- }
- return compareBaseDN;
- }
- });
+ private MultiDomainDBCursor nextChangeForInsertDBCursor;
/**
* Builds a ChangeNumberIndexer object.
@@ -215,11 +173,8 @@
return;
}
- final CSN csn = updateMsg.getCSN();
- // only keep the oldest CSN that will be the new cursor's starting point
- newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
final CSN oldestCSNBefore = getOldestLastAliveCSN();
- lastAliveCSNs.update(baseDN, csn);
+ lastAliveCSNs.update(baseDN, updateMsg.getCSN());
tryNotify(oldestCSNBefore);
}
@@ -364,40 +319,42 @@
for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
- if (!isECLEnabledDomain(baseDN))
+ if (isECLEnabledDomain(baseDN))
{
- continue;
- }
+ for (Integer serverId : entry.getValue())
+ {
+ /*
+ * initialize with the oldest possible CSN in order for medium
+ * consistency to wait for all replicas to be alive before moving forward
+ */
+ lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
+ }
- for (Integer serverId : entry.getValue())
- {
- /*
- * initialize with the oldest possible CSN in order for medium
- * consistency to wait for all replicas to be alive before moving
- * forward
- */
- lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
- // start after the actual CSN when initializing from the previous cookie
- final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
- ensureCursorExists(baseDN, serverId, csn);
+ final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+ lastAliveCSNs.update(baseDN, latestKnownState);
}
-
- ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
- lastAliveCSNs.update(baseDN, latestKnownState);
}
- resetNextChangeForInsertDBCursor();
+
+ nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+ nextChangeForInsertDBCursor.next();
if (newestRecord != null)
{
// restore the "previousCookie" state before shutdown
- final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
+ UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
+ if (record instanceof ReplicaOfflineMsg)
+ {
+ // ignore: replica offline messages are never stored in the CNIndexDB
+ nextChangeForInsertDBCursor.next();
+ record = nextChangeForInsertDBCursor.getRecord();
+ }
+
// sanity check: ensure that when initializing the cursors at the previous
// cookie, the next change we find is the newest record in the CNIndexDB
if (!record.getCSN().equals(newestRecord.getCSN()))
{
- throw new ChangelogException(
- ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord
- .getCSN().toStringUI(), record.getCSN().toStringUI()));
+ throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
+ newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
}
// Now we can update the mediumConsistencyRUV
mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
@@ -428,68 +385,6 @@
return new CSN(0, 0, serverId);
}
- private void resetNextChangeForInsertDBCursor() throws ChangelogException
- {
- final Map<DBCursor<UpdateMsg>, DN> cursors =
- new HashMap<DBCursor<UpdateMsg>, DN>();
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
- : this.allCursors.entrySet())
- {
- for (Entry<Integer, DBCursor<UpdateMsg>> entry2
- : entry.getValue().entrySet())
- {
- cursors.put(entry2.getValue(), entry.getKey());
- }
- }
-
- // CNIndexer manages the cursor itself,
- // so do not try to recycle exhausted cursors
- CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
- result.next();
- nextChangeForInsertDBCursor = result;
- }
-
- private boolean ensureCursorExists(DN baseDN, Integer serverId,
- CSN startAfterCSN) throws ChangelogException
- {
- Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
- if (map == null)
- {
- map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
- allCursors.put(baseDN, map);
- }
- DBCursor<UpdateMsg> cursor = map.get(serverId);
- if (cursor == null)
- {
- final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
- cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
- cursor.next();
- map.put(serverId, cursor);
- return false;
- }
- return true;
- }
-
- /**
- * Returns the immediately preceding CSN.
- *
- * @param csn
- * the CSN to use
- * @return the immediately preceding CSN or null if the provided CSN is null.
- */
- CSN getPrecedingCSN(CSN csn)
- {
- if (csn == null)
- {
- return null;
- }
- if (csn.getSeqnum() > 0)
- {
- return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
- }
- return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
- }
-
/** {@inheritDoc} */
@Override
public void initiateShutdown()
@@ -509,8 +404,7 @@
{
/*
* initialize here to allow fast application start up and avoid errors due
- * cursors being created in a different thread to the one where they are
- * used.
+ * cursors being created in a different thread to the one where they are used.
*/
initialize();
@@ -520,26 +414,29 @@
{
if (!domainsToClear.isEmpty())
{
+ final DN cursorData = nextChangeForInsertDBCursor.getData();
+ final boolean callNextOnCursor =
+ cursorData == null || domainsToClear.contains(cursorData);
while (!domainsToClear.isEmpty())
{
final DN baseDNToClear = domainsToClear.first();
- removeCursors(baseDNToClear);
+ nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
// Only release the waiting thread
// once this domain's state has been cleared.
domainsToClear.remove(baseDNToClear);
}
- resetNextChangeForInsertDBCursor();
- }
- else
- {
- final boolean createdCursors = createNewCursors();
- final boolean recycledCursors = recycleExhaustedCursors();
- if (createdCursors || recycledCursors)
+
+ if (callNextOnCursor)
{
- resetNextChangeForInsertDBCursor();
+ // The next change to consume comes from a domain to be removed.
+ // Call DBCursor.next() to ensure this domain is removed
+ nextChangeForInsertDBCursor.next();
}
}
+ // Do not call DBCursor.next() here
+ // because we might not have consumed the last record,
+ // for example if we could not move the MCP forward
final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
if (msg == null)
{
@@ -551,8 +448,13 @@
}
wait();
}
- // loop to check whether new changes have been added to the
- // ReplicaDBs
+ // check whether new changes have been added to the ReplicaDBs
+ nextChangeForInsertDBCursor.next();
+ continue;
+ }
+ else if (msg instanceof ReplicaOfflineMsg)
+ {
+ nextChangeForInsertDBCursor.next();
continue;
}
@@ -599,37 +501,43 @@
}
catch (RuntimeException e)
{
- // Nothing can be done about it.
- // Rely on the DirectoryThread uncaught exceptions handler
- // for logging error + alert.
- // LocalizableMessage logged here gives corrective information to the administrator.
- logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
- getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ logUnexpectedException(e);
+ // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
throw e;
}
catch (Exception e)
{
- // Nothing can be done about it.
- // Rely on the DirectoryThread uncaught exceptions handler
- // for logging error + alert.
- // LocalizableMessage logged here gives corrective information to the administrator.
- logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
- getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ logUnexpectedException(e);
+ // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
throw new RuntimeException(e);
}
finally
{
- removeCursors(DN.NULL_DN);
+ nextChangeForInsertDBCursor.close();
+ nextChangeForInsertDBCursor = null;
}
}
+ /**
+ * Nothing can be done about it.
+ * <p>
+ * Rely on the DirectoryThread uncaught exceptions handler for logging error +
+ * alert.
+ * <p>
+ * Message logged here gives corrective information to the administrator.
+ */
+ private void logUnexpectedException(Exception e)
+ {
+ logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
+ getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ }
+
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
- boolean callNextOnCursor = true;
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -642,133 +550,22 @@
}
else if (offlineCSN.isOlderThan(mcCSN))
{
- Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
- pair = getCursor(mcBaseDN, mcCSN.getServerId());
- Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
- if (iter != null && !iter.hasNext())
- {
- /*
- * replica is not back online, Medium consistency point has gone past
- * its last offline time, and there are no more changes after the
- * offline CSN in the cursor: remove everything known about it:
- * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
- * this replica from the medium consistency RUV.
- */
- iter.remove();
- StaticUtils.close(pair.getFirst());
- resetNextChangeForInsertDBCursor();
- callNextOnCursor = false;
- lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
- }
+ /*
+ * replica is not back online, Medium consistency point has gone past
+ * its last offline time, and there are no more changes after the
+ * offline CSN in the cursor: remove everything known about it:
+ * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+ * this replica from the medium consistency RUV.
+ */
+ // TODO JNR how to close cursor for offline replica?
+ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
- if (callNextOnCursor)
- {
- // advance the cursor we just read from,
- // success/failure will be checked later
- nextChangeForInsertDBCursor.next();
- }
- }
-
- private void removeCursors(DN baseDN)
- {
- if (nextChangeForInsertDBCursor != null)
- {
- nextChangeForInsertDBCursor.close();
- nextChangeForInsertDBCursor = null;
- }
- if (DN.NULL_DN.equals(baseDN))
- {
- // close all cursors
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
- {
- StaticUtils.close(map.values());
- }
- allCursors.clear();
- newCursors.clear();
- }
- else
- {
- // close cursors for this DN
- final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
- if (map != null)
- {
- StaticUtils.close(map.values());
- }
- for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
- {
- if (it.next().getFirst().equals(baseDN))
- {
- it.remove();
- }
- }
- }
- }
-
- private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
- getCursor(final DN baseDN, final int serverId) throws ChangelogException
- {
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
- : allCursors.entrySet())
- {
- if (baseDN.equals(entry1.getKey()))
- {
- for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
- entry1.getValue().entrySet().iterator(); iter.hasNext();)
- {
- final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
- if (serverId == entry2.getKey())
- {
- return Pair.of(entry2.getValue(), iter);
- }
- }
- }
- }
- return Pair.empty();
- }
-
- private boolean recycleExhaustedCursors() throws ChangelogException
- {
- boolean succesfullyRecycled = false;
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
- {
- for (DBCursor<UpdateMsg> cursor : map.values())
- {
- // try to recycle it by calling next()
- if (cursor.getRecord() == null && cursor.next())
- {
- succesfullyRecycled = true;
- }
- }
- }
- return succesfullyRecycled;
- }
-
- private boolean createNewCursors() throws ChangelogException
- {
- if (!newCursors.isEmpty())
- {
- boolean newCursorAdded = false;
- for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
- newCursors.entrySet().iterator(); iter.hasNext();)
- {
- final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
- final DN baseDN = entry.getKey().getFirst();
- final CSN csn = entry.getValue();
- // start after preceding CSN so the first CSN read will exactly be the
- // current one
- final CSN startFromCSN = getPrecedingCSN(csn);
- if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
- {
- newCursorAdded = true;
- }
- iter.remove();
- }
- return newCursorAdded;
- }
- return false;
+ // advance the cursor we just read from,
+ // success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 52ef4c6..e0305cd 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
* @param <Data>
* The type of data associated with each cursor
*/
-final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
*/
private byte state = UNINITIALIZED;
- /** Whether this composite should try to recycle exhausted cursors. */
- private final boolean recycleExhaustedCursors;
/**
* These cursors are considered exhausted because they had no new changes the
* last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
/**
* The cursors are sorted based on the current change of each cursor to
* consider the next change across all available cursors.
+ * <p>
+ * New cursors for this Map must be created from the same thread that will
+ * make use of them. When this rule is not obeyed, a JE exception will be
+ * thrown about
+ * "Non-transactional Cursors may not be used in multiple threads;".
*/
- private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+ private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
new TreeMap<DBCursor<UpdateMsg>, Data>(
new Comparator<DBCursor<UpdateMsg>>()
{
@@ -81,25 +84,6 @@
}
});
- /**
- * Builds a CompositeDBCursor using the provided collection of cursors.
- *
- * @param cursors
- * the cursors that will be iterated upon.
- * @param recycleExhaustedCursors
- * whether a call to {@link #next()} tries to recycle exhausted
- * cursors
- */
- public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
- boolean recycleExhaustedCursors)
- {
- this.recycleExhaustedCursors = recycleExhaustedCursors;
- for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
- {
- put(entry);
- }
- }
-
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
@@ -108,51 +92,80 @@
{
return false;
}
- final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
+
+ // If previous state was ready, then we must advance the first cursor
+ // (which UpdateMsg has been consumed).
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove the first cursor, then add it again after moving it forward.
+ final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance =
+ state != UNINITIALIZED ? cursors.pollFirstEntry() : null;
state = READY;
- if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
+ recycleExhaustedCursors();
+ if (cursorToAdvance != null)
{
- // try to recycle empty cursors in case the underlying ReplicaDBs received
- // new changes.
+ addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
+ }
+
+ removeNoLongerNeededCursors();
+ incorporateNewCursors();
+ return !cursors.isEmpty();
+ }
+
+ private void recycleExhaustedCursors() throws ChangelogException
+ {
+ if (!exhaustedCursors.isEmpty())
+ {
+ // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
final Map<DBCursor<UpdateMsg>, Data> copy =
new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
{
- entry.getKey().next();
- put(entry);
- }
- final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
- if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
- {
- // if the first cursor was previously an exhausted cursor,
- // then we have already called next() on it.
- // Avoid calling it again because we know new changes have been found.
- return true;
+ addCursor(entry.getKey(), entry.getValue());
}
}
-
- // To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and add again the cursor after moving it forward.
- if (advanceNonExhaustedCursors)
- {
- Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
- if (firstEntry != null)
- {
- final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
- cursor.next();
- put(firstEntry);
- }
- }
- // no cursors are left with changes.
- return !cursors.isEmpty();
}
- private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
+ private void removeNoLongerNeededCursors()
{
- final DBCursor<UpdateMsg> cursor = entry.getKey();
- final Data data = entry.getValue();
- if (cursor.getRecord() != null)
+ for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
+ {
+ final Data dataToFind = iter.next();
+ for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
+ cursors.entrySet().iterator(); cursorIter.hasNext();)
+ {
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
+ if (dataToFind.equals(entry.getValue()))
+ {
+ entry.getKey().close();
+ cursorIter.remove();
+ }
+ }
+ iter.remove();
+ }
+ }
+
+ /**
+ * Returns an Iterator over the data associated to cursors that must be removed.
+ *
+ * @return an Iterator over the data associated to cursors that must be removed.
+ */
+ protected abstract Iterator<Data> removedCursorsIterator();
+
+ /**
+ * Adds a cursor to this composite cursor. It first calls
+ * {@link DBCursor#next()} to verify whether it is exhausted or not.
+ *
+ * @param cursor
+ * the cursor to add to this composite
+ * @param data
+ * the data associated to the provided cursor
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
+ {
+ if (cursor.next())
{
this.cursors.put(cursor, data);
}
@@ -166,6 +179,8 @@
@Override
public UpdateMsg getRecord()
{
+ // Cannot call incorporateNewCursors() here because
+ // somebody might have already called DBCursor.getRecord() and read the record
final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
if (entry != null)
{
@@ -175,6 +190,16 @@
}
/**
+ * Called when implementors should incorporate new cursors into the current
+ * composite DBCursor. Implementors should call
+ * {@link #addCursor(DBCursor, Object)} to do so.
+ *
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ protected abstract void incorporateNewCursors() throws ChangelogException;
+
+ /**
* Returns the data associated to the cursor that returned the current record.
*
* @return the data associated to the cursor that returned the current record.
@@ -193,8 +218,11 @@
@Override
public void close()
{
+ state = CLOSED;
StaticUtils.close(cursors.keySet());
StaticUtils.close(exhaustedCursors.keySet());
+ cursors.clear();
+ exhaustedCursors.clear();
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
new file mode 100644
index 0000000..3cf8b5d
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -0,0 +1,127 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a replication domain's replica DBs.
+ */
+public class DomainDBCursor extends CompositeDBCursor<Void>
+{
+
+ private final DN baseDN;
+ private final ReplicationDomainDB domainDB;
+
+ private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
+ new ConcurrentSkipListMap<Integer, CSN>();
+ /**
+ * Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
+ */
+ private static final CSN NULL_CSN = new CSN(0, 0, 0);
+
+ /**
+ * Builds a DomainDBCursor instance.
+ *
+ * @param baseDN
+ * the replication domain baseDN of this cursor
+ * @param domainDB
+ * the DB for the provided replication domain
+ */
+ public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+ {
+ this.baseDN = baseDN;
+ this.domainDB = domainDB;
+ }
+
+ /**
+ * Returns the replication domain baseDN of this cursor.
+ *
+ * @return the replication domain baseDN of this cursor.
+ */
+ public DN getBaseDN()
+ {
+ return baseDN;
+ }
+
+ /**
+ * Adds a replicaDB for this cursor to iterate over. Added cursors will be
+ * created and iterated over on the next call to {@link #next()}.
+ *
+ * @param serverId
+ * the serverId of the replica
+ * @param startAfterCSN
+ * the CSN after which to start iterating
+ */
+ public void addReplicaDB(int serverId, CSN startAfterCSN)
+ {
+ // only keep the oldest CSN that will be the new cursor's starting point
+ newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void incorporateNewCursors() throws ChangelogException
+ {
+ for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();)
+ {
+ final Entry<Integer, CSN> pair = iter.next();
+ final int serverId = pair.getKey();
+ final CSN csn = pair.getValue();
+ final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
+ final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ addCursor(cursor, null);
+ iter.remove();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Iterator<Void> removedCursorsIterator()
+ {
+ return Collections.EMPTY_LIST.iterator(); // nothing to remove
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ super.close();
+ domainDB.unregisterCursor(this);
+ newReplicas.clear();
+ }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 7e75bf1..dc23a7e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -72,13 +74,20 @@
* <li>then check it's not null</li>
* <li>then close all inside</li>
* </ol>
- * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+ * When creating a replicaDB, synchronize on the domainMap to avoid
* concurrent shutdown.
*/
- private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
- domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
- private ReplicationDbEnv dbEnv;
- private ReplicationServerCfg config;
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+ /**
+ * \@GuardedBy("itself")
+ */
+ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+ new HashMap<DN, List<DomainDBCursor>>();
+ private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+ new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private ReplicationDbEnv replicationEnv;
+ private final ReplicationServerCfg config;
private final File dbDirectory;
/**
@@ -103,9 +112,9 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
- private AtomicBoolean shutdown = new AtomicBoolean();
+ private final AtomicBoolean shutdown = new AtomicBoolean();
- private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+ private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
new DBCursor<UpdateMsg>()
{
@@ -135,7 +144,7 @@
};
/**
- * Builds an instance of this class.
+ * Creates a new changelog DB.
*
* @param replicationServer
* the local replication server.
@@ -144,15 +153,15 @@
* @throws ConfigException
* if a problem occurs opening the supplied directory
*/
- public JEChangelogDB(ReplicationServer replicationServer,
- ReplicationServerCfg config) throws ConfigException
+ public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+ throws ConfigException
{
this.config = config;
this.replicationServer = replicationServer;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
- private File makeDir(String dbDirName) throws ConfigException
+ private File makeDir(final String dbDirName) throws ConfigException
{
// Check that this path exists or create it.
final File dbDirectory = getFileForPath(dbDirName);
@@ -168,15 +177,13 @@
{
logger.traceException(e);
- final LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
- mb.append(e.getLocalizedMessage());
- mb.append(" ");
- mb.append(dbDirectory);
- throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e);
+ final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
+ e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
+ throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
}
}
- private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
+ private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
{
final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
if (domainMap != null)
@@ -186,29 +193,12 @@
return Collections.emptyMap();
}
- private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
+ private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
{
return getDomainMap(baseDN).get(serverId);
}
/**
- * Provision resources for the specified serverId in the specified replication
- * domain.
- *
- * @param baseDN
- * the replication domain where to add the serverId
- * @param serverId
- * the server Id to add to the replication domain
- * @throws ChangelogException
- * If a database error happened.
- */
- private void commission(DN baseDN, int serverId, ReplicationServer rs)
- throws ChangelogException
- {
- getOrCreateReplicaDB(baseDN, serverId, rs);
- }
-
- /**
* Returns a {@link JEReplicaDB}, possibly creating it.
*
* @param baseDN
@@ -217,35 +207,42 @@
* the serverId for which to create a ReplicaDB
* @param server
* the ReplicationServer
- * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
- * to be created
+ * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
* @throws ChangelogException
* if a problem occurred with the database
*/
- Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
- int serverId, ReplicationServer server) throws ChangelogException
+ Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+ final ReplicationServer server) throws ChangelogException
{
while (!shutdown.get())
{
- final ConcurrentMap<Integer, JEReplicaDB> domainMap =
- getExistingOrNewDomainMap(baseDN);
- final Pair<JEReplicaDB, Boolean> result =
- getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+ final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
if (result != null)
{
+ final Boolean dbWasCreated = result.getSecond();
+ if (dbWasCreated)
+ { // new replicaDB => update all cursors with it
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (DomainDBCursor cursor : cursors)
+ {
+ cursor.addReplicaDB(serverId, null);
+ }
+ }
+ }
+
return result;
}
}
- throw new ChangelogException(
- ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+ throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
- private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
- DN baseDN)
+ private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
{
// happy path: the domainMap already exists
- final ConcurrentMap<Integer, JEReplicaDB> currentValue =
- domainToReplicaDBs.get(baseDN);
+ final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
if (currentValue != null)
{
return currentValue;
@@ -254,30 +251,36 @@
// unlucky, the domainMap does not exist: take the hit and create the
// newValue, even though the same could be done concurrently by another
// thread
- final ConcurrentMap<Integer, JEReplicaDB> newValue =
- new ConcurrentHashMap<Integer, JEReplicaDB>();
- final ConcurrentMap<Integer, JEReplicaDB> previousValue =
- domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+ final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
if (previousValue != null)
{
// there was already a value associated to the key, let's use it
return previousValue;
}
+
+ if (MultimasterReplication.isECLEnabledDomain(baseDN))
+ {
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
+ }
return newValue;
}
- private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
- final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
- DN baseDN, ReplicationServer server) throws ChangelogException
+ private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+ final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
{
- // happy path: the JEReplicaDB already exists
+ // happy path: the replicaDB already exists
JEReplicaDB currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
- // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+ // unlucky, the replicaDB does not exist: take the hit and synchronize
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
@@ -293,11 +296,11 @@
// The domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
- // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+ // 1) shutdown properly or 2) lazily recreate the replicaDB
return null;
}
- final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+ final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
domainMap.put(serverId, newDB);
return Pair.of(newDB, true);
}
@@ -310,8 +313,8 @@
try
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
- dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
- final ChangelogState changelogState = dbEnv.getChangelogState();
+ replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+ final ChangelogState changelogState = replicationEnv.getChangelogState();
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
@@ -338,12 +341,12 @@
{
for (int serverId : entry.getValue())
{
- commission(entry.getKey(), serverId, replicationServer);
+ getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
}
}
}
- private void shutdownCNIndexDB() throws ChangelogException
+ private void shutdownChangeNumberIndexDB() throws ChangelogException
{
synchronized (cnIndexDBLock)
{
@@ -381,7 +384,7 @@
try
{
- shutdownCNIndexDB();
+ shutdownChangeNumberIndexDB();
}
catch (ChangelogException e)
{
@@ -402,7 +405,7 @@
}
}
- if (dbEnv != null)
+ if (replicationEnv != null)
{
// wait for shutdown of the threads holding cursors
try
@@ -421,7 +424,7 @@
// do nothing: we are already shutting down
}
- dbEnv.shutdown();
+ replicationEnv.shutdown();
}
if (firstException != null)
@@ -431,11 +434,10 @@
}
/**
- * Clears all content from the changelog database, but leaves its directory on
- * the filesystem.
+ * Clears all records from the changelog (does not remove the changelog itself).
*
* @throws ChangelogException
- * If a database problem happened
+ * If an error occurs when clearing the changelog.
*/
public void clearDB() throws ChangelogException
{
@@ -469,7 +471,7 @@
try
{
- shutdownCNIndexDB();
+ shutdownChangeNumberIndexDB();
}
catch (ChangelogException e)
{
@@ -584,7 +586,7 @@
// 3- clear the changelogstate DB
try
{
- dbEnv.clearGenerationId(baseDN);
+ replicationEnv.clearGenerationId(baseDN);
}
catch (ChangelogException e)
{
@@ -635,7 +637,7 @@
{
if (computeChangeNumber)
{
- startIndexer(dbEnv.getChangelogState());
+ startIndexer(replicationEnv.getChangelogState());
}
else
{
@@ -673,7 +675,7 @@
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+ cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
}
catch (Exception e)
{
@@ -694,40 +696,57 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
- throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
{
- final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
- final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
- for (int serverId : serverIds)
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ registeredMultiDomainCursors.add(cursor);
+ for (DN baseDN : domainToReplicaDBs.keySet())
{
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
- final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
- replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
- cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+ cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
}
- // recycle exhausted cursors,
- // because client code will not manage the cursors itself
- return new CompositeDBCursor<Void>(cursors, true);
+ return cursor;
}
- private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
- ServerState startAfterServerState)
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+ throws ChangelogException
{
- final ServerState domainState = offlineReplicas.getServerState(baseDN);
- if (domainState != null)
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ for (int serverId : getDomainMap(baseDN).keySet())
{
- for (CSN offlineCSN : domainState)
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ cursor.addReplicaDB(serverId, lastCSN);
+ }
+ return cursor;
+ }
+
+ private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ {
+ synchronized (registeredDomainCursors)
+ {
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors == null)
{
- if (serverId == offlineCSN.getServerId()
- && !startAfterServerState.cover(offlineCSN))
- {
- return offlineCSN;
- }
+ cursors = new ArrayList<DomainDBCursor>();
+ registeredDomainCursors.put(baseDN, cursors);
}
+ cursors.add(cursor);
+ return cursor;
+ }
+ }
+
+ private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+ {
+ final MultiDomainServerState offlineReplicas =
+ replicationEnv.getChangelogState().getOfflineReplicas();
+ final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+ if (offlineCSN != null
+ && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+ {
+ return offlineCSN;
}
return null;
}
@@ -737,31 +756,57 @@
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
throws ChangelogException
{
- JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- return replicaDB.generateCursorFrom(startAfterCSN);
+ final DBCursor<UpdateMsg> cursor =
+ replicaDB.generateCursorFrom(startAfterCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ // TODO JNR if (offlineCSN != null) ??
+ // What about replicas that suddenly become offline?
+ return new ReplicaOfflineCursor(cursor, offlineCSN);
}
- return EMPTY_CURSOR;
+ return EMPTY_CURSOR_REPLICA_DB;
}
/** {@inheritDoc} */
@Override
- public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
- throws ChangelogException
+ public void unregisterCursor(final DBCursor<?> cursor)
{
- final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
- updateMsg.getCSN().getServerId(), replicationServer);
- final JEReplicaDB replicaDB = pair.getFirst();
- final boolean wasCreated = pair.getSecond();
+ if (cursor instanceof MultiDomainDBCursor)
+ {
+ registeredMultiDomainCursors.remove(cursor);
+ }
+ else if (cursor instanceof DomainDBCursor)
+ {
+ final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+ synchronized (registeredMultiDomainCursors)
+ {
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ }
+ /** {@inheritDoc} */
+ @Override
+ public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+ {
+ final CSN csn = updateMsg.getCSN();
+ final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+ csn.getServerId(), replicationServer);
+ final JEReplicaDB replicaDB = pair.getFirst();
replicaDB.add(updateMsg);
+
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.publishUpdateMsg(baseDN, updateMsg);
}
- return wasCreated;
+ return pair.getSecond(); // replica DB was created
}
/** {@inheritDoc} */
@@ -779,7 +824,7 @@
@Override
public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
{
- dbEnv.addOfflineReplica(baseDN, offlineCSN);
+ replicationEnv.addOfflineReplica(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
new file mode 100644
index 0000000..9f78065
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -0,0 +1,123 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a all the replication domain known to the changelog DB.
+ */
+public class MultiDomainDBCursor extends CompositeDBCursor<DN>
+{
+ private final ReplicationDomainDB domainDB;
+
+ private final ConcurrentSkipListMap<DN, ServerState> newDomains =
+ new ConcurrentSkipListMap<DN, ServerState>();
+ private final ConcurrentSkipListSet<DN> removeDomains =
+ new ConcurrentSkipListSet<DN>();
+
+ /**
+ * Builds a MultiDomainDBCursor instance.
+ *
+ * @param domainDB
+ * the replication domain management DB
+ */
+ public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+ {
+ this.domainDB = domainDB;
+ }
+
+ /**
+ * Adds a replication domain for this cursor to iterate over. Added cursors
+ * will be created and iterated over on the next call to {@link #next()}.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ * @param startAfterState
+ * the {@link ServerState} after which to start iterating
+ */
+ public void addDomain(DN baseDN, ServerState startAfterState)
+ {
+ newDomains.put(baseDN,
+ startAfterState != null ? startAfterState : new ServerState());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void incorporateNewCursors() throws ChangelogException
+ {
+ for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator();
+ iter.hasNext();)
+ {
+ final Entry<DN, ServerState> entry = iter.next();
+ final DN baseDN = entry.getKey();
+ final ServerState serverState = entry.getValue();
+ final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+ addCursor(domainDBCursor, baseDN);
+ iter.remove();
+ }
+ }
+
+ /**
+ * Removes a replication domain from this cursor and stops iterating over it.
+ * Removed cursors will be effectively removed on the next call to
+ * {@link #next()}.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ */
+ public void removeDomain(DN baseDN)
+ {
+ removeDomains.add(baseDN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected Iterator<DN> removedCursorsIterator()
+ {
+ return removeDomains.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ super.close();
+ domainDB.unregisterCursor(this);
+ newDomains.clear();
+ removeDomains.clear();
+ }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index d8d536c..ebf4df5 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -95,6 +95,16 @@
public class ExternalChangeLogTest extends ReplicationTestCase
{
+ private static class Results
+ {
+
+ public final List<SearchResultEntryProtocolOp> searchResultEntries =
+ new ArrayList<SearchResultEntryProtocolOp>();
+ public long searchReferences;
+ public long searchesDone;
+
+ }
+
private static final int SERVER_ID_1 = 1201;
private static final int SERVER_ID_2 = 1202;
@@ -188,14 +198,15 @@
@Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
public void TestWithAndWithoutControl() throws Exception
{
+ final String tn = "TestWithAndWithoutControl";
replicationServer.getChangelogDB().setPurgeDelay(0);
// Write changes and read ECL from start
- ECLCompatWriteReadAllOps(1);
+ ECLCompatWriteReadAllOps(1, tn);
ECLCompatNoControl(1);
// Write additional changes and read ECL from a provided change number
- ECLCompatWriteReadAllOps(5);
+ ECLCompatWriteReadAllOps(5, tn);
}
@Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
@@ -293,12 +304,13 @@
@Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
public void ECLReplicationServerFullTest15() throws Exception
{
+ final String tn = "ECLReplicationServerFullTest15";
replicationServer.getChangelogDB().setPurgeDelay(0);
// Write 4 changes and read ECL from start
- ECLCompatWriteReadAllOps(1);
+ ECLCompatWriteReadAllOps(1, tn);
// Write 4 additional changes and read ECL from a provided change number
- CSN csn = ECLCompatWriteReadAllOps(5);
+ CSN csn = ECLCompatWriteReadAllOps(5, tn);
// Test request from a provided change number - read 6
ECLCompatReadFrom(6, csn);
@@ -895,15 +907,12 @@
final CSN[] csns = generateCSNs(3, SERVER_ID_1);
publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
- Thread.sleep(1000);
-
- // Test that last cookie has been updated
- String cookieNotEmpty = readLastCookie();
- debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+ final String firstCookie = assertLastCookieDifferentThanLastValue("");
+ String lastCookie = firstCookie;
publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
// ---
// 2. Now set up a very short purge delay on the replication changelogs
@@ -930,7 +939,7 @@
// returns the appropriate error.
debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+ searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
assertTrue(searchOp.getErrorMessage().toString().startsWith(
ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
searchOp.getErrorMessage().toString());
@@ -962,26 +971,21 @@
final CSN[] csns = generateCSNs(3, SERVER_ID_1);
publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
- Thread.sleep(1000);
-
- // Test that last cookie has been updated
- String cookieNotEmpty = readLastCookie();
- debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+ final String firstCookie = assertLastCookieDifferentThanLastValue("");
+ String lastCookie = firstCookie;
publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
// ---
// 2. Now remove the domain by sending a reset message
- ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
- server01.publish(msg);
+ server01.publish(new ResetGenerationIdMsg(23657));
// ---
// 3. Assert that a request with an empty cookie returns nothing
// since replication changelog has been cleared
String cookie= "";
- InternalSearchOperation searchOp = null;
searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
// ---
@@ -989,7 +993,7 @@
// since replication changelog has been cleared
cookie = readLastCookie();
debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
+ searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
// ---
// 5. Assert that a request with an "old" cookie - one that refers to
@@ -997,7 +1001,8 @@
// returns the appropriate error.
debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+ final InternalSearchOperation searchOp =
+ searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
}
finally
@@ -1007,6 +1012,23 @@
debugInfo(testName, "Ending test successfully");
}
+ private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
+ {
+ int cnt = 0;
+ while (cnt < 100)
+ {
+ final String newCookie = readLastCookie();
+ if (!newCookie.equals(lastCookie))
+ {
+ return newCookie;
+ }
+ cnt++;
+ Thread.sleep(10);
+ }
+ Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
+ return null;// dead code
+ }
+
private void debugAndWriteEntries(LDIFWriter ldifWriter,
List<SearchResultEntry> entries, String tn) throws Exception
{
@@ -1074,10 +1096,11 @@
// Publish ADD
csnCounter++;
- String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
- + "objectClass: top\n" + "objectClass: domain\n"
- + "entryUUID: "+user1entryUUID+"\n";
- Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+ Entry entry = TestCaseUtils.entryFromLdifString(
+ "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "entryUUID: " + user1entryUUID + "\n");
AddMsg addMsg = new AddMsg(
csns[csnCounter],
DN.valueOf("uid="+tn+"2," + TEST_ROOT_DN_STRING),
@@ -1412,49 +1435,27 @@
InvocationCounterPlugin.resetAllCounters();
- long searchEntries;
- long searchReferences = ldapStatistics.getSearchResultReferences();
- long searchesDone = ldapStatistics.getSearchResultsDone();
+ final Results results = new Results();
+ results.searchReferences = ldapStatistics.getSearchResultReferences();
+ results.searchesDone = ldapStatistics.getSearchResultsDone();
debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
- LDAPMessage message = new LDAPMessage(2, searchRequest, controls);
- w.writeMessage(message);
+ w.writeMessage(new LDAPMessage(2, searchRequest, controls));
Thread.sleep(500);
if (!changesOnly)
{
// Wait for change 1
debugInfo(tn, "Waiting for init search expected to return change 1");
- searchEntries = 0;
+ readMessages(tn, r, results, 1, "Init search Result=");
+ for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
{
- while (searchEntries < 1 && (message = r.readMessage()) != null)
- {
- debugInfo(tn, "Init search Result=" +
- message.getProtocolOpType() + message + " " + searchEntries);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- SearchResultEntryProtocolOp searchResultEntry =
- message.getSearchResultEntryProtocolOp();
- searchEntries++;
- // FIXME:ECL Double check 1 is really the valid value here.
- checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
- (compatMode?"1":"0"));
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- searchReferences++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
- }
+ // FIXME:ECL Double check 1 is really the valid value here.
+ final String cn = compatMode ? "1" : "0";
+ checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
}
debugInfo(tn, "INIT search done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
+ + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
}
// Produces change 2
@@ -1470,30 +1471,8 @@
" published , psearch will now wait for new entries");
// wait for the 1 new entry
- searchEntries = 0;
- SearchResultEntryProtocolOp searchResultEntry = null;
- while (searchEntries < 1 && (message = r.readMessage()) != null)
- {
- debugInfo(tn, "psearch search Result=" +
- message.getProtocolOpType() + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchResultEntry = message.getSearchResultEntryProtocolOp();
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- searchReferences++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r, results, 1, "psearch search Result=");
+ SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
Thread.sleep(1000);
// Check we received change 2
@@ -1523,11 +1502,12 @@
createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
debugInfo(tn, "ACI test : sending search");
- message = new LDAPMessage(2, searchRequest, createCookieControl(""));
- w.writeMessage(message);
+ w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
- searchesDone=0;
- searchEntries = 0;
+ LDAPMessage message;
+ int searchesDone = 0;
+ int searchEntries = 0;
+ int searchReferences = 0;
while ((searchesDone==0) && (message = r.readMessage()) != null)
{
debugInfo(tn, "ACI test : message returned " +
@@ -1719,125 +1699,53 @@
InvocationCounterPlugin.resetAllCounters();
- ldapStatistics.getSearchRequests();
- long searchEntries = ldapStatistics.getSearchResultEntries();
- ldapStatistics.getSearchResultReferences();
- long searchesDone = ldapStatistics.getSearchResultsDone();
+ final Results results = new Results();
+ results.searchesDone = ldapStatistics.getSearchResultsDone();
- LDAPMessage message;
- message = new LDAPMessage(2, searchRequest1, controls);
- w1.writeMessage(message);
+ w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
Thread.sleep(500);
-
- message = new LDAPMessage(2, searchRequest2, controls);
- w2.writeMessage(message);
+ w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
Thread.sleep(500);
-
- message = new LDAPMessage(2, searchRequest3, controls);
- w3.writeMessage(message);
+ w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
Thread.sleep(500);
if (!changesOnly)
{
debugInfo(tn, "Search1 Persistent filter=" + searchRequest1.getFilter()
+ " expected to return change " + csn1);
- searchEntries = 0;
- message = null;
-
{
- while (searchEntries < 1 && (message = r1.readMessage()) != null)
+ readMessages(tn, r1, results, 1, "Search1 Result=");
+ final int searchEntries = results.searchResultEntries.size();
+ if (searchEntries == 1)
{
- debugInfo(tn, "Search1 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- SearchResultEntryProtocolOp searchResultEntry =
- message.getSearchResultEntryProtocolOp();
- searchEntries++;
- if (searchEntries==1)
- {
- checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
- checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
- (compatMode?"10":"0"));
- }
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
+ final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
+ final String cn = compatMode ? "10" : "0";
+ checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
+ checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
}
+ debugInfo(tn, "Search1 done with success. searchEntries="
+ + searchEntries + " #searchesDone=" + results.searchesDone);
}
- debugInfo(tn, "Search1 done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
- searchEntries = 0;
- message = null;
{
debugInfo(tn, "Search 2 Persistent filter=" + searchRequest2.getFilter()
+ " expected to return change " + csn2 + " & " + csn3);
- while (searchEntries < 2 && (message = r2.readMessage()) != null)
+ readMessages(tn, r2, results, 2, "Search 2 Result=");
+ for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
{
- debugInfo(tn, "Search 2 Result=" +
- message.getProtocolOpType() + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- SearchResultEntryProtocolOp searchResultEntry =
- message.getSearchResultEntryProtocolOp();
- searchEntries++;
- checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
- (compatMode?"10":"0"));
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
+ final String cn = compatMode ? "10" : "0";
+ checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
}
+ debugInfo(tn, "Search2 done with success. searchEntries="
+ + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
}
- debugInfo(tn, "Search2 done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
- searchEntries = 0;
- message = null;
- {
- debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter()
- + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
- while (searchEntries < 4 && (message = r3.readMessage()) != null)
- {
- debugInfo(tn, "Search3 Result=" +
- message.getProtocolOpType() + " " + message);
-
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
- }
- }
+ debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter()
+ + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
+ readMessages(tn, r3, results, 4, "Search3 Result=");
debugInfo(tn, "Search3 done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
-
+ + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
}
// Produces additional change
@@ -1871,82 +1779,19 @@
debugInfo(tn, delMsg13.getCSN() + " published additionally ");
// wait 11
- searchEntries = 0;
- message = null;
- while (searchEntries < 1 && (message = r1.readMessage()) != null)
- {
- debugInfo(tn, "Search 11 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r1, results, 1, "Search 11 Result=");
Thread.sleep(1000);
debugInfo(tn, "Search 1 successfully receives additional changes");
// wait 12 & 13
- searchEntries = 0;
- message = null;
- while (searchEntries < 2 && (message = r2.readMessage()) != null)
- {
- debugInfo(tn, "psearch search 12 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r2, results, 2, "psearch search 12 Result=");
Thread.sleep(1000);
debugInfo(tn, "Search 2 successfully receives additional changes");
// wait 11 & 12 & 13
- searchEntries = 0;
- SearchResultEntryProtocolOp searchResultEntry = null;
- message = null;
- while (searchEntries < 3 && (message = r3.readMessage()) != null)
- {
- debugInfo(tn, "psearch search 13 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchResultEntry = message.getSearchResultEntryProtocolOp();
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r3, results, 3, "psearch search 13 Result=");
+ SearchResultEntryProtocolOp searchResultEntry =
+ results.searchResultEntries.get(results.searchResultEntries.size() - 1);
Thread.sleep(1000);
// Check we received change 13
@@ -1961,6 +1806,35 @@
debugInfo(tn, "Ends test successfully");
}
+ private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
+ final Results results, final int i, final String string) throws Exception
+ {
+ results.searchResultEntries.clear();
+
+ LDAPMessage message;
+ while (results.searchResultEntries.size() < i
+ && (message = r.readMessage()) != null)
+ {
+ debugInfo(tn, string + message.getProtocolOpType() + " " + message);
+
+ switch (message.getProtocolOpType())
+ {
+ case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
+ results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
+ break;
+
+ case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
+ results.searchReferences++;
+ break;
+
+ case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
+ assertSuccessful(message);
+ results.searchesDone++;
+ break;
+ }
+ }
+ }
+
private void assertSuccessful(LDAPMessage message)
{
SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
@@ -2007,10 +1881,9 @@
new BindRequestProtocolOp(
ByteString.valueOf(bindDN),
3, ByteString.valueOf(password));
- LDAPMessage message = new LDAPMessage(1, bindRequest);
- w.writeMessage(message);
+ w.writeMessage(new LDAPMessage(1, bindRequest));
- message = r.readMessage();
+ final LDAPMessage message = r.readMessage();
BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
assertEquals(bindResponse.getResultCode(), expected);
@@ -2204,9 +2077,9 @@
debugInfo(tn, "Ending test successfully");
}
- private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
+ private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
{
- String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
+ String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
debugInfo(tn, "Starting test\n\n");
LDAPReplicationDomain domain = null;
try
@@ -2224,17 +2097,16 @@
CSN[] csns = generateCSNs(4, SERVER_ID_1);
// Publish DEL
- DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
+ DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
server01.publish(delMsg);
debugInfo(tn, " publishes " + delMsg.getCSN());
// Publish ADD
- String lentry =
- "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
+ Entry entry = TestCaseUtils.entryFromLdifString(
+ "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
+ "objectClass: top\n"
+ "objectClass: domain\n"
- + "entryUUID: "+user1entryUUID+"\n";
- Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+ + "entryUUID: " + user1entryUUID + "\n");
AddMsg addMsg = new AddMsg(
csns[1],
entry.getName(),
@@ -2247,7 +2119,7 @@
debugInfo(tn, " publishes " + addMsg.getCSN());
// Publish MOD
- DN baseDN = DN.valueOf("uid="+tn+"3," + TEST_ROOT_DN_STRING);
+ DN baseDN = DN.valueOf("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
List<Modification> mods = createMods("description", "new value");
ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
server01.publish(modMsg);
@@ -2255,7 +2127,7 @@
// Publish modDN
ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
- DN.valueOf("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
+ DN.valueOf("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
RDN.decode("uid="+tn+"new4"), // new rdn
true, // deleteoldrdn
TEST_ROOT_DN2); // new superior
@@ -2265,8 +2137,8 @@
server01.publish(modDNMsg);
debugInfo(tn, " publishes " + modDNMsg.getCSN());
- String filter = "(targetdn=*" + tn + "*,o=test)";
- InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
+ InternalSearchOperation searchOp =
+ searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
// test 4 entries returned
final LDIFWriter ldifWriter = getLDIFWriter();
@@ -2276,7 +2148,7 @@
stop(server01);
// Test with filter on change number
- filter =
+ String filter =
"(&(targetdn=*" + tn + "*,o=test)"
+ "(&(changenumber>=" + firstChangeNumber + ")"
+ "(changenumber<=" + (firstChangeNumber + 3) + ")))";
@@ -2339,7 +2211,7 @@
long firstChangeNumber, int i, String tn, CSN csn)
{
final long changeNumber = firstChangeNumber + i;
- final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING;
+ final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
assertDNEquals(resultEntry, changeNumber);
checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
@@ -2352,9 +2224,11 @@
private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
{
- String actualDN = resultEntry.getName().toNormalizedString();
- String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
- assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
+ final String actualDN = resultEntry.getName().toNormalizedString();
+ final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
+ assertThat(actualDN)
+ .as("Unexpected DN for entry " + resultEntry)
+ .isEqualToIgnoringCase(expectedDN);
}
private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
@@ -2548,7 +2422,7 @@
while (!cnIndexDB.isEmpty())
{
debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
- Thread.sleep(200);
+ Thread.sleep(10);
}
debugInfo(tn, "Ending test with success");
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index bf5c2da..23c9318 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -128,7 +128,11 @@
private ChangeNumberIndexDB cnIndexDB;
@Mock
private ReplicationDomainDB domainDB;
- private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
+
+ private List<DN> eclEnabledDomains;
+ private MultiDomainDBCursor multiDomainCursor;
+ private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors;
+ private Map<DN, DomainDBCursor> domainDBCursors;
private ChangelogState initialState;
private Map<DN, ServerState> domainNewestCSNs;
private ChangeNumberIndexer cnIndexer;
@@ -153,13 +157,18 @@
public void setup() throws Exception
{
MockitoAnnotations.initMocks(this);
- when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
- when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+ multiDomainCursor = new MultiDomainDBCursor(domainDB);
initialState = new ChangelogState();
initialCookie = new MultiDomainServerState();
- cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ domainDBCursors = new HashMap<DN, DomainDBCursor>();
domainNewestCSNs = new HashMap<DN, ServerState>();
+
+ when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
+ when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+ when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
+ multiDomainCursor);
}
@AfterMethod
@@ -173,15 +182,17 @@
@Test
public void emptyDBNoDS() throws Exception
{
- startCNIndexer(BASE_DN1);
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneDS() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -192,10 +203,11 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBOneDS() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
addReplica(BASE_DN1, serverId1);
setCNIndexDBInitialRecords(msg1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
@@ -206,9 +218,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSs() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
// simulate messages received out of order
@@ -224,9 +237,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsDifferentDomains() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN2, serverId2);
- startCNIndexer(BASE_DN1, BASE_DN2);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -259,8 +273,9 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -287,12 +302,13 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBTwoDSs() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
setCNIndexDBInitialRecords(msg1, msg2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
@@ -312,9 +328,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
@@ -329,10 +346,11 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(ADMIN_DATA_DN, serverId1);
addReplica(BASE_DN1, serverId2);
addReplica(BASE_DN1, serverId3);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
// cn=admin data will does not participate in the external changelog
@@ -350,8 +368,9 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -371,8 +390,9 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -390,9 +410,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -407,9 +428,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneGoingOffline() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -440,10 +462,11 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -473,12 +496,13 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
// blocked until we receive info for serverId2
assertExternalChangelogContent();
@@ -517,13 +541,14 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg2, msg3);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
// MCP moves forward because serverId1 is not really offline
@@ -540,9 +565,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneKilled() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -562,10 +588,26 @@
private void addReplica(DN baseDN, int serverId) throws Exception
{
- final SequentialDBCursor cursor = new SequentialDBCursor();
- cursors.put(Pair.of(baseDN, serverId), cursor);
- when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
- .thenReturn(cursor);
+ final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
+ replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
+
+ if (isECLEnabledDomain2(baseDN))
+ {
+ DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
+ if (domainDBCursor == null)
+ {
+ domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+ domainDBCursors.put(baseDN, domainDBCursor);
+
+ multiDomainCursor.addDomain(baseDN, null);
+ when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+ .thenReturn(domainDBCursor);
+ }
+ domainDBCursor.addReplicaDB(serverId, null);
+ when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+ .thenReturn(replicaDBCursor);
+ }
+
when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
getDomainNewestCSNs(baseDN));
initialState.addServerIdToDomain(serverId, baseDN);
@@ -582,21 +624,26 @@
return serverState;
}
- private void startCNIndexer(DN... eclEnabledDomains)
+ private void startCNIndexer()
{
- final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
{
@Override
protected boolean isECLEnabledDomain(DN baseDN)
{
- return eclEnabledDomainList.contains(baseDN);
+ return isECLEnabledDomain2(baseDN);
}
+
};
cnIndexer.start();
waitForWaitingState(cnIndexer);
}
+ private boolean isECLEnabledDomain2(DN baseDN)
+ {
+ return eclEnabledDomains.contains(baseDN);
+ }
+
private void stopCNIndexer() throws Exception
{
if (cnIndexer != null)
@@ -631,7 +678,8 @@
final CSN csn = newestMsg.getCSN();
when(cnIndexDB.getNewestRecord()).thenReturn(
new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
- final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
+ final SequentialDBCursor cursor =
+ replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
cursor.add(newestMsg);
}
initialCookie.update(msg.getBaseDN(), msg.getCSN());
@@ -643,7 +691,7 @@
for (ReplicatedUpdateMsg msg : msgs)
{
final SequentialDBCursor cursor =
- cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
+ replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
if (msg.isEmptyCursor())
{
cursor.add(null);
@@ -746,11 +794,4 @@
};
}
- @Test(dataProvider = "precedingCSNDataProvider")
- public void getPrecedingCSN(CSN start, CSN expected)
- {
- cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
- CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
- assertThat(precedingCSN).isEqualTo(expected);
- }
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 5f54b10..da8bcd0 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,8 +25,8 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collections;
+import java.util.Iterator;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -45,6 +45,20 @@
public class CompositeDBCursorTest extends DirectoryServerTestCase
{
+ private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String>
+ {
+ @Override
+ protected void incorporateNewCursors() throws ChangelogException
+ {
+ }
+
+ @Override
+ protected Iterator<String> removedCursorsIterator()
+ {
+ return Collections.EMPTY_LIST.iterator();
+ }
+ }
+
private UpdateMsg msg1;
private UpdateMsg msg2;
private UpdateMsg msg3;
@@ -173,8 +187,6 @@
of(msg4, baseDN1));
}
- // TODO : this test fails because msg2 is returned twice
- @Test(enabled=false)
public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -220,16 +232,12 @@
private CompositeDBCursor<String> newCompositeDBCursor(
Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
{
- final Map<DBCursor<UpdateMsg>, String> cursorsMap =
- new HashMap<DBCursor<UpdateMsg>, String>();
+ final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor();
for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
{
- // The cursors in the composite are expected to be pointing
- // to first record available
- pair.getFirst().next();
- cursorsMap.put(pair.getFirst(), pair.getSecond());
+ cursor.addCursor(pair.getFirst(), pair.getSecond());
}
- return new CompositeDBCursor<String>(cursorsMap, true);
+ return cursor;
}
private void assertInOrder(final CompositeDBCursor<String> compCursor,
--
Gitblit v1.10.0