From 6531523345efd08282b1454dc06e572ae9eff848 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 244 ++++++----
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 35
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 37 +
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 359 +++------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 132 +++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 130 +++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 115 +++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 154 ++++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 161 +++++-
9 files changed, 829 insertions(+), 538 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 4639e7e..56b0509 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,10 @@
package org.opends.server.replication.server.changelog.api;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
/**
@@ -89,6 +91,26 @@
*/
void removeDomain(DN baseDN) throws ChangelogException;
+ /**
+ * Generates a {@link DBCursor} across all the domains starting after the
+ * provided {@link MultiDomainServerState} for each domain.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link DBCursor#close()} method to free the resources and locks used by the
+ * cursor.
+ *
+ * @param startAfterState
+ * Starting point for each domain cursor. If any {@link ServerState}
+ * for a domain is null, then start from the oldest CSN for each
+ * replicaDBs
+ * @return a non null {@link DBCursor}
+ * @throws ChangelogException
+ * If a database problem happened
+ * @see #getCursorFrom(DN, ServerState)
+ */
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+ throws ChangelogException;
+
// serverId methods
/**
@@ -102,16 +124,17 @@
*
* @param baseDN
* the replication domain baseDN
- * @param startAfterServerState
+ * @param startAfterState
* Starting point for each ReplicaDB cursor. If any CSN for a
* replicaDB is null, then start from the oldest CSN for this
* replicaDB
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
+ * @see #getCursorFrom(DN, int, CSN)
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
- ServerState startAfterServerState) throws ChangelogException;
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+ throws ChangelogException;
/**
* Generates a {@link DBCursor} for one replicaDB for the specified
@@ -136,6 +159,14 @@
throws ChangelogException;
/**
+ * Unregisters the provided cursor from this replication domain.
+ *
+ * @param cursor
+ * the cursor to unregister.
+ */
+ void unregisterCursor(DBCursor<?> cursor);
+
+ /**
* Publishes the provided change to the changelog DB for the specified
* serverId and replication domain. After a change has been successfully
* published, it becomes available to be returned by the External ChangeLog.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7592fd9..a194d6d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,7 +46,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
+import org.opends.server.replication.server.changelog.je.DomainDBCursor;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
@@ -64,6 +66,7 @@
*/
public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
{
+ /** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
/**
@@ -77,11 +80,18 @@
* <li>then check it's not null</li>
* <li>then close all inside</li>
* </ol>
- * When creating a FileReplicaDB, synchronize on the domainMap to avoid
+ * When creating a replicaDB, synchronize on the domainMap to avoid
* concurrent shutdown.
*/
- private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>>
- domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+ /**
+ * \@GuardedBy("itself")
+ */
+ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+ new HashMap<DN, List<DomainDBCursor>>();
+ private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+ new CopyOnWriteArrayList<MultiDomainDBCursor>();
private ReplicationEnvironment replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -124,10 +134,10 @@
* if a problem occurs opening the supplied directory
*/
public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
- throws ConfigException
+ throws ConfigException
{
- this.replicationServer = replicationServer;
this.config = config;
+ this.replicationServer = replicationServer;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
@@ -175,8 +185,7 @@
* the serverId for which to create a ReplicaDB
* @param server
* the ReplicationServer
- * @return a Pair with the FileReplicaDB and a boolean indicating whether it had
- * to be created
+ * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
* @throws ChangelogException
* if a problem occurred with the database
*/
@@ -189,6 +198,19 @@
final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
if (result != null)
{
+ final Boolean dbWasCreated = result.getSecond();
+ if (dbWasCreated)
+ { // new replicaDB => update all cursors with it
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (DomainDBCursor cursor : cursors)
+ {
+ cursor.addReplicaDB(serverId, null);
+ }
+ }
+ }
+
return result;
}
}
@@ -214,20 +236,26 @@
// there was already a value associated to the key, let's use it
return previousValue;
}
+
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
return newValue;
}
private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
{
- // happy path: the FileReplicaDB already exists
+ // happy path: the replicaDB already exists
FileReplicaDB currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
- // unlucky, the FileReplicaDB does not exist: take the hit and synchronize
+ // unlucky, the replicaDB does not exist: take the hit and synchronize
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
@@ -242,7 +270,7 @@
// The domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
- // 1) shutdown properly or 2) lazily recreate the FileReplicaDB
+ // 1) shutdown properly or 2) lazily recreate the replicaDB
return null;
}
@@ -371,6 +399,7 @@
{
// do nothing: we are already shutting down
}
+
replicationEnv.shutdown();
}
@@ -381,10 +410,10 @@
}
/**
- * Clears all records from the changelog (does not remove the log itself).
+ * Clears all records from the changelog (does not remove the changelog itself).
*
* @throws ChangelogException
- * If an error occurs when clearing the log.
+ * If an error occurs when clearing the changelog.
*/
public void clearDB() throws ChangelogException
{
@@ -629,40 +658,57 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
- throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
{
- final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
- final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
- for (int serverId : serverIds)
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ registeredMultiDomainCursors.add(cursor);
+ for (DN baseDN : domainToReplicaDBs.keySet())
{
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
- final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
- replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
- cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+ cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
}
- // recycle exhausted cursors,
- // because client code will not manage the cursors itself
- return new CompositeDBCursor<Void>(cursors, true);
+ return cursor;
}
- private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
- ServerState startAfterServerState)
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+ throws ChangelogException
{
- final ServerState domainState = offlineReplicas.getServerState(baseDN);
- if (domainState != null)
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ for (int serverId : getDomainMap(baseDN).keySet())
{
- for (CSN offlineCSN : domainState)
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ cursor.addReplicaDB(serverId, lastCSN);
+ }
+ return cursor;
+ }
+
+ private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ {
+ synchronized (registeredDomainCursors)
+ {
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors == null)
{
- if (serverId == offlineCSN.getServerId()
- && !startAfterServerState.cover(offlineCSN))
- {
- return offlineCSN;
- }
+ cursors = new ArrayList<DomainDBCursor>();
+ registeredDomainCursors.put(baseDN, cursors);
}
+ cursors.add(cursor);
+ return cursor;
+ }
+ }
+
+ private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+ {
+ final MultiDomainServerState offlineReplicas =
+ replicationEnv.getChangelogState().getOfflineReplicas();
+ final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+ if (offlineCSN != null
+ && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+ {
+ return offlineCSN;
}
return null;
}
@@ -675,28 +721,55 @@
final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- return replicaDB.generateCursorFrom(startAfterCSN);
+ final DBCursor<UpdateMsg> cursor =
+ replicaDB.generateCursorFrom(startAfterCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ // TODO JNR if (offlineCSN != null) ??
+ // What about replicas that suddenly become offline?
+ return new ReplicaOfflineCursor(cursor, offlineCSN);
}
return EMPTY_CURSOR_REPLICA_DB;
}
/** {@inheritDoc} */
@Override
+ public void unregisterCursor(final DBCursor<?> cursor)
+ {
+ if (cursor instanceof MultiDomainDBCursor)
+ {
+ registeredMultiDomainCursors.remove(cursor);
+ }
+ else if (cursor instanceof DomainDBCursor)
+ {
+ final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+ synchronized (registeredMultiDomainCursors)
+ {
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
{
+ final CSN csn = updateMsg.getCSN();
final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
- updateMsg.getCSN().getServerId(), replicationServer);
+ csn.getServerId(), replicationServer);
final FileReplicaDB replicaDB = pair.getFirst();
- final boolean wasCreated = pair.getSecond();
-
replicaDB.add(updateMsg);
+
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
- notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+ notifyReplicaOnline(indexer, baseDN, csn.getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
- return wasCreated;
+ return pair.getSecond(); // replica DB was created
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index deacf2a..68b9688 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,15 +29,8 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.messages.Message;
@@ -47,18 +40,15 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
-import org.opends.server.util.StaticUtils;
-
-import com.forgerock.opendj.util.Pair;
/**
* Thread responsible for inserting replicated changes into the ChangeNumber
@@ -84,7 +74,7 @@
private ChangelogState changelogState;
/*
- * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
+ * The following MultiDomainServerState fields must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
* updates 2) many updates can happen concurrently.
*/
@@ -130,39 +120,7 @@
*
* @NonNull
*/
- @SuppressWarnings("unchecked")
- private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
- new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
-
- /**
- * New cursors for this Map must be created from the {@link #run()} method,
- * i.e. from the same thread that will make use of them. If this rule is not
- * obeyed, then a JE exception will be thrown about
- * "Non-transactional Cursors may not be used in multiple threads;".
- */
- private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
- new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
- /**
- * Holds the newCursors that will have to be created in the next iteration
- * inside the {@link #run()} method.
- * <p>
- * This map can be updated by multiple threads.
- */
- private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
- new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
- new Comparator<Pair<DN, Integer>>()
- {
- @Override
- public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
- {
- final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
- if (compareBaseDN == 0)
- {
- return o1.getSecond().compareTo(o2.getSecond());
- }
- return compareBaseDN;
- }
- });
+ private MultiDomainDBCursor nextChangeForInsertDBCursor;
/**
* Builds a ChangeNumberIndexer object.
@@ -232,11 +190,8 @@
return;
}
- final CSN csn = updateMsg.getCSN();
- // only keep the oldest CSN that will be the new cursor's starting point
- newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
final CSN oldestCSNBefore = getOldestLastAliveCSN();
- lastAliveCSNs.update(baseDN, csn);
+ lastAliveCSNs.update(baseDN, updateMsg.getCSN());
tryNotify(oldestCSNBefore);
}
@@ -381,28 +336,25 @@
for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
- if (!isECLEnabledDomain(baseDN))
+ if (isECLEnabledDomain(baseDN))
{
- continue;
- }
+ for (Integer serverId : entry.getValue())
+ {
+ /*
+ * initialize with the oldest possible CSN in order for medium
+ * consistency to wait for all replicas to be alive before moving
+ * forward
+ */
+ lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
+ }
- for (Integer serverId : entry.getValue())
- {
- /*
- * initialize with the oldest possible CSN in order for medium
- * consistency to wait for all replicas to be alive before moving
- * forward
- */
- lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
- // start after the actual CSN when initializing from the previous cookie
- final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
- ensureCursorExists(baseDN, serverId, csn);
+ ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+ lastAliveCSNs.update(baseDN, latestKnownState);
}
-
- ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
- lastAliveCSNs.update(baseDN, latestKnownState);
}
- resetNextChangeForInsertDBCursor();
+
+ nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+ nextChangeForInsertDBCursor.next();
if (newestRecord != null)
{
@@ -445,68 +397,6 @@
return new CSN(0, 0, serverId);
}
- private void resetNextChangeForInsertDBCursor() throws ChangelogException
- {
- final Map<DBCursor<UpdateMsg>, DN> cursors =
- new HashMap<DBCursor<UpdateMsg>, DN>();
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
- : this.allCursors.entrySet())
- {
- for (Entry<Integer, DBCursor<UpdateMsg>> entry2
- : entry.getValue().entrySet())
- {
- cursors.put(entry2.getValue(), entry.getKey());
- }
- }
-
- // CNIndexer manages the cursor itself,
- // so do not try to recycle exhausted cursors
- CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
- result.next();
- nextChangeForInsertDBCursor = result;
- }
-
- private boolean ensureCursorExists(DN baseDN, Integer serverId,
- CSN startAfterCSN) throws ChangelogException
- {
- Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
- if (map == null)
- {
- map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
- allCursors.put(baseDN, map);
- }
- DBCursor<UpdateMsg> cursor = map.get(serverId);
- if (cursor == null)
- {
- final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
- cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
- cursor.next();
- map.put(serverId, cursor);
- return false;
- }
- return true;
- }
-
- /**
- * Returns the immediately preceding CSN.
- *
- * @param csn
- * the CSN to use
- * @return the immediately preceding CSN or null if the provided CSN is null.
- */
- CSN getPrecedingCSN(CSN csn)
- {
- if (csn == null)
- {
- return null;
- }
- if (csn.getSeqnum() > 0)
- {
- return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
- }
- return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
- }
-
/** {@inheritDoc} */
@Override
public void initiateShutdown()
@@ -535,28 +425,18 @@
{
try
{
- if (!domainsToClear.isEmpty())
+ while (!domainsToClear.isEmpty())
{
- while (!domainsToClear.isEmpty())
- {
- final DN baseDNToClear = domainsToClear.first();
- removeCursors(baseDNToClear);
- // Only release the waiting thread
- // once this domain's state has been cleared.
- domainsToClear.remove(baseDNToClear);
- }
- resetNextChangeForInsertDBCursor();
- }
- else
- {
- final boolean createdCursors = createNewCursors();
- final boolean recycledCursors = recycleExhaustedCursors();
- if (createdCursors || recycledCursors)
- {
- resetNextChangeForInsertDBCursor();
- }
+ final DN baseDNToClear = domainsToClear.first();
+ nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+ // Only release the waiting thread
+ // once this domain's state has been cleared.
+ domainsToClear.remove(baseDNToClear);
}
+ // Do not call DBCursor.next() here
+ // because we might not have consumed the last record,
+ // for example if we could not move the MCP forward
final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
if (msg == null)
{
@@ -568,8 +448,13 @@
}
wait();
}
- // loop to check whether new changes have been added to the
- // ReplicaDBs
+ // check whether new changes have been added to the ReplicaDBs
+ nextChangeForInsertDBCursor.next();
+ continue;
+ }
+ else if (msg instanceof ReplicaOfflineMsg)
+ {
+ nextChangeForInsertDBCursor.next();
continue;
}
@@ -615,39 +500,44 @@
}
catch (RuntimeException e)
{
- // Nothing can be done about it.
- // Rely on the DirectoryThread uncaught exceptions handler
- // for logging error + alert.
- // Message logged here gives corrective information to the administrator.
- Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
- getClass().getSimpleName(), stackTraceToSingleLineString(e));
- TRACER.debugError(msg.toString());
+ logUnexpectedException(e);
+ // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
throw e;
}
catch (Exception e)
{
- // Nothing can be done about it.
- // Rely on the DirectoryThread uncaught exceptions handler
- // for logging error + alert.
- // Message logged here gives corrective information to the administrator.
- Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
- getClass().getSimpleName(), stackTraceToSingleLineString(e));
- TRACER.debugError(msg.toString());
+ logUnexpectedException(e);
+ // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
throw new RuntimeException(e);
}
finally
{
- removeCursors(DN.NULL_DN);
+ nextChangeForInsertDBCursor.close();
+ nextChangeForInsertDBCursor = null;
}
}
+ /**
+ * Nothing can be done about it.
+ * <p>
+ * Rely on the DirectoryThread uncaught exceptions handler for logging error +
+ * alert.
+ * <p>
+ * Message logged here gives corrective information to the administrator.
+ */
+ private void logUnexpectedException(Exception e)
+ {
+ Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+ getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ TRACER.debugError(msg.toString());
+ }
+
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
- boolean callNextOnCursor = true;
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -660,133 +550,22 @@
}
else if (offlineCSN.isOlderThan(mcCSN))
{
- Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
- pair = getCursor(mcBaseDN, mcCSN.getServerId());
- Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
- if (iter != null && !iter.hasNext())
- {
- /*
- * replica is not back online, Medium consistency point has gone past
- * its last offline time, and there are no more changes after the
- * offline CSN in the cursor: remove everything known about it:
- * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
- * this replica from the medium consistency RUV.
- */
- iter.remove();
- StaticUtils.close(pair.getFirst());
- resetNextChangeForInsertDBCursor();
- callNextOnCursor = false;
- lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
- }
+ /*
+ * replica is not back online, Medium consistency point has gone past
+ * its last offline time, and there are no more changes after the
+ * offline CSN in the cursor: remove everything known about it:
+ * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+ * this replica from the medium consistency RUV.
+ */
+ // TODO JNR how to close cursor for offline replica?
+ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
- if (callNextOnCursor)
- {
- // advance the cursor we just read from,
- // success/failure will be checked later
- nextChangeForInsertDBCursor.next();
- }
- }
-
- private void removeCursors(DN baseDN)
- {
- if (nextChangeForInsertDBCursor != null)
- {
- nextChangeForInsertDBCursor.close();
- nextChangeForInsertDBCursor = null;
- }
- if (DN.NULL_DN.equals(baseDN))
- {
- // close all cursors
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
- {
- StaticUtils.close(map.values());
- }
- allCursors.clear();
- newCursors.clear();
- }
- else
- {
- // close cursors for this DN
- final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
- if (map != null)
- {
- StaticUtils.close(map.values());
- }
- for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
- {
- if (it.next().getFirst().equals(baseDN))
- {
- it.remove();
- }
- }
- }
- }
-
- private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
- getCursor(final DN baseDN, final int serverId) throws ChangelogException
- {
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
- : allCursors.entrySet())
- {
- if (baseDN.equals(entry1.getKey()))
- {
- for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
- entry1.getValue().entrySet().iterator(); iter.hasNext();)
- {
- final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
- if (serverId == entry2.getKey())
- {
- return Pair.of(entry2.getValue(), iter);
- }
- }
- }
- }
- return Pair.empty();
- }
-
- private boolean recycleExhaustedCursors() throws ChangelogException
- {
- boolean succesfullyRecycled = false;
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
- {
- for (DBCursor<UpdateMsg> cursor : map.values())
- {
- // try to recycle it by calling next()
- if (cursor.getRecord() == null && cursor.next())
- {
- succesfullyRecycled = true;
- }
- }
- }
- return succesfullyRecycled;
- }
-
- private boolean createNewCursors() throws ChangelogException
- {
- if (!newCursors.isEmpty())
- {
- boolean newCursorAdded = false;
- for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
- newCursors.entrySet().iterator(); iter.hasNext();)
- {
- final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
- final DN baseDN = entry.getKey().getFirst();
- final CSN csn = entry.getValue();
- // start after preceding CSN so the first CSN read will exactly be the
- // current one
- final CSN startFromCSN = getPrecedingCSN(csn);
- if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
- {
- newCursorAdded = true;
- }
- iter.remove();
- }
- return newCursorAdded;
- }
- return false;
+ // advance the cursor we just read from,
+ // success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index ccd27c0..7f271c1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
* @param <Data>
* The type of data associated with each cursor
*/
-public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
*/
private byte state = UNINITIALIZED;
- /** Whether this composite should try to recycle exhausted cursors. */
- private final boolean recycleExhaustedCursors;
/**
* These cursors are considered exhausted because they had no new changes the
* last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
/**
* The cursors are sorted based on the current change of each cursor to
* consider the next change across all available cursors.
+ * <p>
+ * New cursors for this Map must be created from the same thread that will
+ * make use of them. When this rule is not obeyed, a JE exception will be
+ * thrown about
+ * "Non-transactional Cursors may not be used in multiple threads;".
*/
- private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+ private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
new TreeMap<DBCursor<UpdateMsg>, Data>(
new Comparator<DBCursor<UpdateMsg>>()
{
@@ -81,25 +84,6 @@
}
});
- /**
- * Builds a CompositeDBCursor using the provided collection of cursors.
- *
- * @param cursors
- * the cursors that will be iterated upon.
- * @param recycleExhaustedCursors
- * whether a call to {@link #next()} tries to recycle exhausted
- * cursors
- */
- public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
- boolean recycleExhaustedCursors)
- {
- this.recycleExhaustedCursors = recycleExhaustedCursors;
- for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
- {
- put(entry);
- }
- }
-
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
@@ -108,51 +92,75 @@
{
return false;
}
- final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
- state = READY;
- if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
+
+ if (state == UNINITIALIZED)
{
- // try to recycle empty cursors in case the underlying ReplicaDBs received
- // new changes.
+ state = READY;
+ }
+ else
+ {
+ // Previous state was READY => we must advance the first cursor
+ // because the UpdateMsg it is pointing has already been consumed.
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove the first cursor, then add it again after moving it forward.
+ final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry();
+ if (cursorToAdvance != null)
+ {
+ addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
+ }
+ }
+
+ recycleExhaustedCursors();
+ removeNoLongerNeededCursors();
+ incorporateNewCursors();
+ return !cursors.isEmpty();
+ }
+
+ private void recycleExhaustedCursors() throws ChangelogException
+ {
+ if (!exhaustedCursors.isEmpty())
+ {
+ // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
final Map<DBCursor<UpdateMsg>, Data> copy =
new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
{
- entry.getKey().next();
- put(entry);
- }
- final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
- if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
- {
- // if the first cursor was previously an exhausted cursor,
- // then we have already called next() on it.
- // Avoid calling it again because we know new changes have been found.
- return true;
+ addCursor(entry.getKey(), entry.getValue());
}
}
-
- // To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and add again the cursor after moving it forward.
- if (advanceNonExhaustedCursors)
- {
- Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
- if (firstEntry != null)
- {
- final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
- cursor.next();
- put(firstEntry);
- }
- }
- // no cursors are left with changes.
- return !cursors.isEmpty();
}
- private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
+ private void removeNoLongerNeededCursors()
{
- final DBCursor<UpdateMsg> cursor = entry.getKey();
- final Data data = entry.getValue();
- if (cursor.getRecord() != null)
+ for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
+ cursors.entrySet().iterator(); iterator.hasNext();)
+ {
+ final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
+ final Data data = entry.getValue();
+ if (isCursorNoLongerNeededFor(data))
+ {
+ entry.getKey().close();
+ iterator.remove();
+ cursorRemoved(data);
+ }
+ }
+ }
+
+ /**
+ * Adds a cursor to this composite cursor. It first calls
+ * {@link DBCursor#next()} to verify whether it is exhausted or not.
+ *
+ * @param cursor
+ * the cursor to add to this composite
+ * @param data
+ * the data associated to the provided cursor
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
+ {
+ if (cursor.next())
{
this.cursors.put(cursor, data);
}
@@ -166,6 +174,8 @@
@Override
public UpdateMsg getRecord()
{
+ // Cannot call incorporateNewCursors() here because
+ // somebody might have already called DBCursor.getRecord() and read the record
final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
if (entry != null)
{
@@ -175,6 +185,33 @@
}
/**
+ * Called when implementors should incorporate new cursors into the current
+ * composite DBCursor. Implementors should call
+ * {@link #addCursor(DBCursor, Object)} to do so.
+ *
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ protected abstract void incorporateNewCursors() throws ChangelogException;
+
+ /**
+ * Returns whether the cursor associated to the provided data should be removed.
+ *
+ * @param data the data associated to the cursor to be tested
+ * @return true if the cursor associated to the provided data should be removed,
+ * false otherwise
+ */
+ protected abstract boolean isCursorNoLongerNeededFor(Data data);
+
+ /**
+ * Notifies that the cursor associated to the provided data has been removed.
+ *
+ * @param data
+ * the data associated to the removed cursor
+ */
+ protected abstract void cursorRemoved(Data data);
+
+ /**
* Returns the data associated to the cursor that returned the current record.
*
* @return the data associated to the cursor that returned the current record.
@@ -193,8 +230,11 @@
@Override
public void close()
{
+ state = CLOSED;
StaticUtils.close(cursors.keySet());
StaticUtils.close(exhaustedCursors.keySet());
+ cursors.clear();
+ exhaustedCursors.clear();
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
new file mode 100644
index 0000000..04620c5
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -0,0 +1,132 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a replication domain's replica DBs.
+ */
+public class DomainDBCursor extends CompositeDBCursor<Void>
+{
+
+ private final DN baseDN;
+ private final ReplicationDomainDB domainDB;
+
+ private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
+ new ConcurrentSkipListMap<Integer, CSN>();
+ /**
+ * Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
+ */
+ private static final CSN NULL_CSN = new CSN(0, 0, 0);
+
+ /**
+ * Builds a DomainDBCursor instance.
+ *
+ * @param baseDN
+ * the replication domain baseDN of this cursor
+ * @param domainDB
+ * the DB for the provided replication domain
+ */
+ public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+ {
+ this.baseDN = baseDN;
+ this.domainDB = domainDB;
+ }
+
+ /**
+ * Returns the replication domain baseDN of this cursor.
+ *
+ * @return the replication domain baseDN of this cursor.
+ */
+ public DN getBaseDN()
+ {
+ return baseDN;
+ }
+
+ /**
+ * Adds a replicaDB for this cursor to iterate over. Added cursors will be
+ * created and iterated over on the next call to {@link #next()}.
+ *
+ * @param serverId
+ * the serverId of the replica
+ * @param startAfterCSN
+ * the CSN after which to start iterating
+ */
+ public void addReplicaDB(int serverId, CSN startAfterCSN)
+ {
+ // only keep the oldest CSN that will be the new cursor's starting point
+ newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void incorporateNewCursors() throws ChangelogException
+ {
+ for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();)
+ {
+ final Entry<Integer, CSN> pair = iter.next();
+ final int serverId = pair.getKey();
+ final CSN csn = pair.getValue();
+ final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
+ final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ addCursor(cursor, null);
+ iter.remove();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected boolean isCursorNoLongerNeededFor(Void data)
+ {
+ return false; // Not needed
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void cursorRemoved(Void data)
+ {
+ // Not used so far
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ super.close();
+ domainDB.unregisterCursor(this);
+ newReplicas.clear();
+ }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 723b682..4ebee02 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,10 +29,10 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
@@ -76,13 +76,20 @@
* <li>then check it's not null</li>
* <li>then close all inside</li>
* </ol>
- * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+ * When creating a replicaDB, synchronize on the domainMap to avoid
* concurrent shutdown.
*/
- private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
- domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
- private ReplicationDbEnv dbEnv;
- private ReplicationServerCfg config;
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+ /**
+ * \@GuardedBy("itself")
+ */
+ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+ new HashMap<DN, List<DomainDBCursor>>();
+ private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+ new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private ReplicationDbEnv replicationEnv;
+ private final ReplicationServerCfg config;
private final File dbDirectory;
/**
@@ -107,9 +114,9 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
- private AtomicBoolean shutdown = new AtomicBoolean();
+ private final AtomicBoolean shutdown = new AtomicBoolean();
- private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+ private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
new DBCursor<UpdateMsg>()
{
@@ -139,7 +146,7 @@
};
/**
- * Builds an instance of this class.
+ * Creates a new changelog DB.
*
* @param replicationServer
* the local replication server.
@@ -148,15 +155,15 @@
* @throws ConfigException
* if a problem occurs opening the supplied directory
*/
- public JEChangelogDB(ReplicationServer replicationServer,
- ReplicationServerCfg config) throws ConfigException
+ public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+ throws ConfigException
{
this.config = config;
this.replicationServer = replicationServer;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
- private File makeDir(String dbDirName) throws ConfigException
+ private File makeDir(final String dbDirName) throws ConfigException
{
// Check that this path exists or create it.
final File dbDirectory = getFileForPath(dbDirName);
@@ -173,12 +180,9 @@
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- final MessageBuilder mb = new MessageBuilder();
- mb.append(e.getLocalizedMessage());
- mb.append(" ");
- mb.append(String.valueOf(dbDirectory));
- Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
- throw new ConfigException(msg, e);
+ final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
+ .append(String.valueOf(dbDirectory));
+ throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
}
}
@@ -223,35 +227,42 @@
* the serverId for which to create a ReplicaDB
* @param server
* the ReplicationServer
- * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
- * to be created
+ * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
* @throws ChangelogException
* if a problem occurred with the database
*/
- Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
- int serverId, ReplicationServer server) throws ChangelogException
+ Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+ final ReplicationServer server) throws ChangelogException
{
while (!shutdown.get())
{
- final ConcurrentMap<Integer, JEReplicaDB> domainMap =
- getExistingOrNewDomainMap(baseDN);
- final Pair<JEReplicaDB, Boolean> result =
- getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+ final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
if (result != null)
{
+ final Boolean dbWasCreated = result.getSecond();
+ if (dbWasCreated)
+ { // new replicaDB => update all cursors with it
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (DomainDBCursor cursor : cursors)
+ {
+ cursor.addReplicaDB(serverId, null);
+ }
+ }
+ }
+
return result;
}
}
- throw new ChangelogException(
- ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+ throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
- private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
- DN baseDN)
+ private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
{
// happy path: the domainMap already exists
- final ConcurrentMap<Integer, JEReplicaDB> currentValue =
- domainToReplicaDBs.get(baseDN);
+ final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
if (currentValue != null)
{
return currentValue;
@@ -260,30 +271,33 @@
// unlucky, the domainMap does not exist: take the hit and create the
// newValue, even though the same could be done concurrently by another
// thread
- final ConcurrentMap<Integer, JEReplicaDB> newValue =
- new ConcurrentHashMap<Integer, JEReplicaDB>();
- final ConcurrentMap<Integer, JEReplicaDB> previousValue =
- domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+ final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
if (previousValue != null)
{
// there was already a value associated to the key, let's use it
return previousValue;
}
+
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
return newValue;
}
- private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
- final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
- DN baseDN, ReplicationServer server) throws ChangelogException
+ private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+ final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
{
- // happy path: the JEReplicaDB already exists
+ // happy path: the replicaDB already exists
JEReplicaDB currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
- // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+ // unlucky, the replicaDB does not exist: take the hit and synchronize
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
@@ -299,11 +313,11 @@
// The domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
- // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+ // 1) shutdown properly or 2) lazily recreate the replicaDB
return null;
}
- final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+ final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
domainMap.put(serverId, newDB);
return Pair.of(newDB, true);
}
@@ -316,8 +330,8 @@
try
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
- dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
- final ChangelogState changelogState = dbEnv.getChangelogState();
+ replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+ final ChangelogState changelogState = replicationEnv.getChangelogState();
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
@@ -351,7 +365,7 @@
}
}
- private void shutdownCNIndexDB() throws ChangelogException
+ private void shutdownChangeNumberIndexDB() throws ChangelogException
{
synchronized (cnIndexDBLock)
{
@@ -389,7 +403,7 @@
try
{
- shutdownCNIndexDB();
+ shutdownChangeNumberIndexDB();
}
catch (ChangelogException e)
{
@@ -410,7 +424,7 @@
}
}
- if (dbEnv != null)
+ if (replicationEnv != null)
{
// wait for shutdown of the threads holding cursors
try
@@ -429,7 +443,7 @@
// do nothing: we are already shutting down
}
- dbEnv.shutdown();
+ replicationEnv.shutdown();
}
if (firstException != null)
@@ -439,11 +453,10 @@
}
/**
- * Clears all content from the changelog database, but leaves its directory on
- * the filesystem.
+ * Clears all records from the changelog (does not remove the changelog itself).
*
* @throws ChangelogException
- * If a database problem happened
+ * If an error occurs when clearing the changelog.
*/
public void clearDB() throws ChangelogException
{
@@ -477,7 +490,7 @@
try
{
- shutdownCNIndexDB();
+ shutdownChangeNumberIndexDB();
}
catch (ChangelogException e)
{
@@ -593,7 +606,7 @@
// 3- clear the changelogstate DB
try
{
- dbEnv.clearGenerationId(baseDN);
+ replicationEnv.clearGenerationId(baseDN);
}
catch (ChangelogException e)
{
@@ -644,7 +657,7 @@
{
if (computeChangeNumber)
{
- startIndexer(dbEnv.getChangelogState());
+ startIndexer(replicationEnv.getChangelogState());
}
else
{
@@ -682,7 +695,7 @@
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+ cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
}
catch (Exception e)
{
@@ -704,75 +717,118 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
- throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
{
- final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
- final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
- for (int serverId : serverIds)
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ registeredMultiDomainCursors.add(cursor);
+ for (DN baseDN : domainToReplicaDBs.keySet())
{
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
- final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
- replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
- cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+ cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
}
- // recycle exhausted cursors,
- // because client code will not manage the cursors itself
- return new CompositeDBCursor<Void>(cursors, true);
+ return cursor;
}
- private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
- ServerState startAfterServerState)
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+ throws ChangelogException
{
- final ServerState domainState = offlineReplicas.getServerState(baseDN);
- if (domainState != null)
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ for (int serverId : getDomainMap(baseDN).keySet())
{
- for (CSN offlineCSN : domainState)
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ cursor.addReplicaDB(serverId, lastCSN);
+ }
+ return cursor;
+ }
+
+ private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ {
+ synchronized (registeredDomainCursors)
+ {
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors == null)
{
- if (serverId == offlineCSN.getServerId()
- && !startAfterServerState.cover(offlineCSN))
- {
- return offlineCSN;
- }
+ cursors = new ArrayList<DomainDBCursor>();
+ registeredDomainCursors.put(baseDN, cursors);
}
+ cursors.add(cursor);
+ return cursor;
+ }
+ }
+
+ private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+ {
+ final MultiDomainServerState offlineReplicas =
+ replicationEnv.getChangelogState().getOfflineReplicas();
+ final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+ if (offlineCSN != null
+ && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+ {
+ return offlineCSN;
}
return null;
}
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
throws ChangelogException
{
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- return replicaDB.generateCursorFrom(startAfterCSN);
+ final DBCursor<UpdateMsg> cursor =
+ replicaDB.generateCursorFrom(startAfterCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ // TODO JNR if (offlineCSN != null) ??
+ // What about replicas that suddenly become offline?
+ return new ReplicaOfflineCursor(cursor, offlineCSN);
}
- return EMPTY_CURSOR;
+ return EMPTY_CURSOR_REPLICA_DB;
}
/** {@inheritDoc} */
@Override
- public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
- throws ChangelogException
+ public void unregisterCursor(final DBCursor<?> cursor)
{
- final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
- updateMsg.getCSN().getServerId(), replicationServer);
- final JEReplicaDB replicaDB = pair.getFirst();
- final boolean wasCreated = pair.getSecond();
+ if (cursor instanceof MultiDomainDBCursor)
+ {
+ registeredMultiDomainCursors.remove(cursor);
+ }
+ else if (cursor instanceof DomainDBCursor)
+ {
+ final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+ synchronized (registeredMultiDomainCursors)
+ {
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ }
+ /** {@inheritDoc} */
+ @Override
+ public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+ {
+ final CSN csn = updateMsg.getCSN();
+ final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+ csn.getServerId(), replicationServer);
+ final JEReplicaDB replicaDB = pair.getFirst();
replicaDB.add(updateMsg);
+
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
- notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+ notifyReplicaOnline(indexer, baseDN, csn.getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
- return wasCreated;
+ return pair.getSecond(); // replica DB was created
}
/** {@inheritDoc} */
@@ -792,7 +848,7 @@
{
if (indexer.isReplicaOffline(baseDN, serverId))
{
- dbEnv.notifyReplicaOnline(baseDN, serverId);
+ replicationEnv.notifyReplicaOnline(baseDN, serverId);
}
}
@@ -800,7 +856,7 @@
@Override
public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
{
- dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
+ replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
new file mode 100644
index 0000000..5e0fcb7
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -0,0 +1,130 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a all the replication domain known to the changelog DB.
+ */
+public class MultiDomainDBCursor extends CompositeDBCursor<DN>
+{
+ private final ReplicationDomainDB domainDB;
+
+ private final ConcurrentSkipListMap<DN, ServerState> newDomains =
+ new ConcurrentSkipListMap<DN, ServerState>();
+ private final ConcurrentSkipListSet<DN> removeDomains =
+ new ConcurrentSkipListSet<DN>();
+
+ /**
+ * Builds a MultiDomainDBCursor instance.
+ *
+ * @param domainDB
+ * the replication domain management DB
+ */
+ public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+ {
+ this.domainDB = domainDB;
+ }
+
+ /**
+ * Adds a replication domain for this cursor to iterate over. Added cursors
+ * will be created and iterated over on the next call to {@link #next()}.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ * @param startAfterState
+ * the {@link ServerState} after which to start iterating
+ */
+ public void addDomain(DN baseDN, ServerState startAfterState)
+ {
+ newDomains.put(baseDN,
+ startAfterState != null ? startAfterState : new ServerState());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void incorporateNewCursors() throws ChangelogException
+ {
+ for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator();
+ iter.hasNext();)
+ {
+ final Entry<DN, ServerState> entry = iter.next();
+ final DN baseDN = entry.getKey();
+ final ServerState serverState = entry.getValue();
+ final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+ addCursor(domainDBCursor, baseDN);
+ iter.remove();
+ }
+ }
+
+ /**
+ * Removes a replication domain from this cursor and stops iterating over it.
+ * Removed cursors will be effectively removed on the next call to
+ * {@link #next()}.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ */
+ public void removeDomain(DN baseDN)
+ {
+ removeDomains.add(baseDN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected boolean isCursorNoLongerNeededFor(DN baseDN)
+ {
+ return removeDomains.contains(baseDN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void cursorRemoved(DN baseDN)
+ {
+ removeDomains.remove(baseDN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ super.close();
+ domainDB.unregisterCursor(this);
+ newDomains.clear();
+ removeDomains.clear();
+ }
+
+}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 4f7c836..5258d28 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -127,7 +127,11 @@
private ChangeNumberIndexDB cnIndexDB;
@Mock
private ReplicationDomainDB domainDB;
- private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
+
+ private List<DN> eclEnabledDomains;
+ private MultiDomainDBCursor multiDomainCursor;
+ private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors;
+ private Map<DN, DomainDBCursor> domainDBCursors;
private ChangelogState initialState;
private Map<DN, ServerState> domainNewestCSNs;
private ChangeNumberIndexer cnIndexer;
@@ -152,13 +156,18 @@
public void setup() throws Exception
{
MockitoAnnotations.initMocks(this);
- when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
- when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+ multiDomainCursor = new MultiDomainDBCursor(domainDB);
initialState = new ChangelogState();
initialCookie = new MultiDomainServerState();
- cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ domainDBCursors = new HashMap<DN, DomainDBCursor>();
domainNewestCSNs = new HashMap<DN, ServerState>();
+
+ when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
+ when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+ when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
+ multiDomainCursor);
}
@AfterMethod
@@ -172,15 +181,17 @@
@Test
public void emptyDBNoDS() throws Exception
{
- startCNIndexer(BASE_DN1);
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneDS() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -191,10 +202,11 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBOneDS() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
addReplica(BASE_DN1, serverId1);
setCNIndexDBInitialRecords(msg1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
@@ -205,9 +217,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSs() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
// simulate messages received out of order
@@ -223,9 +236,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsDifferentDomains() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN2, serverId2);
- startCNIndexer(BASE_DN1, BASE_DN2);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -258,8 +272,9 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -286,12 +301,13 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBTwoDSs() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
setCNIndexDBInitialRecords(msg1, msg2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
@@ -311,9 +327,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
@@ -328,10 +345,11 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(ADMIN_DATA_DN, serverId1);
addReplica(BASE_DN1, serverId2);
addReplica(BASE_DN1, serverId3);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
// cn=admin data will does not participate in the external changelog
@@ -349,8 +367,9 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -370,8 +389,9 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -389,9 +409,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -406,9 +427,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneGoingOffline() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -439,10 +461,11 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -472,12 +495,13 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
// blocked until we receive info for serverId2
assertExternalChangelogContent();
@@ -516,13 +540,14 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg2, msg3);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
// MCP moves forward because serverId1 is not really offline
@@ -539,9 +564,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneKilled() throws Exception
{
+ eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer(BASE_DN1);
+ startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -561,10 +587,26 @@
private void addReplica(DN baseDN, int serverId) throws Exception
{
- final SequentialDBCursor cursor = new SequentialDBCursor();
- cursors.put(Pair.of(baseDN, serverId), cursor);
- when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
- .thenReturn(cursor);
+ final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
+ replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
+
+ if (isECLEnabledDomain2(baseDN))
+ {
+ DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
+ if (domainDBCursor == null)
+ {
+ domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+ domainDBCursors.put(baseDN, domainDBCursor);
+
+ multiDomainCursor.addDomain(baseDN, null);
+ when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+ .thenReturn(domainDBCursor);
+ }
+ domainDBCursor.addReplicaDB(serverId, null);
+ when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+ .thenReturn(replicaDBCursor);
+ }
+
when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
getDomainNewestCSNs(baseDN));
initialState.addServerIdToDomain(serverId, baseDN);
@@ -581,21 +623,26 @@
return serverState;
}
- private void startCNIndexer(DN... eclEnabledDomains)
+ private void startCNIndexer()
{
- final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
{
@Override
protected boolean isECLEnabledDomain(DN baseDN)
{
- return eclEnabledDomainList.contains(baseDN);
+ return isECLEnabledDomain2(baseDN);
}
+
};
cnIndexer.start();
waitForWaitingState(cnIndexer);
}
+ private boolean isECLEnabledDomain2(DN baseDN)
+ {
+ return eclEnabledDomains.contains(baseDN);
+ }
+
private void stopCNIndexer() throws Exception
{
if (cnIndexer != null)
@@ -630,7 +677,8 @@
final CSN csn = newestMsg.getCSN();
when(cnIndexDB.getNewestRecord()).thenReturn(
new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
- final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
+ final SequentialDBCursor cursor =
+ replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
cursor.add(newestMsg);
}
initialCookie.update(msg.getBaseDN(), msg.getCSN());
@@ -642,7 +690,7 @@
for (ReplicatedUpdateMsg msg : msgs)
{
final SequentialDBCursor cursor =
- cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
+ replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
if (msg.isEmptyCursor())
{
cursor.add(null);
@@ -745,11 +793,4 @@
};
}
- @Test(dataProvider = "precedingCSNDataProvider")
- public void getPrecedingCSN(CSN start, CSN expected)
- {
- cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
- CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
- assertThat(precedingCSN).isEqualTo(expected);
- }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index d4c32b5..7c76a7a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,11 +25,7 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.util.HashMap;
-import java.util.Map;
-
import org.opends.server.DirectoryServerTestCase;
-import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -46,6 +42,25 @@
public class CompositeDBCursorTest extends DirectoryServerTestCase
{
+ private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String>
+ {
+ @Override
+ protected void incorporateNewCursors() throws ChangelogException
+ {
+ }
+
+ @Override
+ protected boolean isCursorNoLongerNeededFor(String data)
+ {
+ return false;
+ }
+
+ @Override
+ protected void cursorRemoved(String data)
+ {
+ }
+ }
+
private UpdateMsg msg1;
private UpdateMsg msg2;
private UpdateMsg msg3;
@@ -174,8 +189,6 @@
of(msg4, baseDN1));
}
- // TODO : this test fails because msg2 is returned twice
- @Test(enabled=false)
public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -221,16 +234,12 @@
private CompositeDBCursor<String> newCompositeDBCursor(
Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
{
- final Map<DBCursor<UpdateMsg>, String> cursorsMap =
- new HashMap<DBCursor<UpdateMsg>, String>();
+ final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor();
for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
{
- // The cursors in the composite are expected to be pointing
- // to first record available
- pair.getFirst().next();
- cursorsMap.put(pair.getFirst(), pair.getSecond());
+ cursor.addCursor(pair.getFirst(), pair.getSecond());
}
- return new CompositeDBCursor<String>(cursorsMap, true);
+ return cursor;
}
private void assertInOrder(final CompositeDBCursor<String> compCursor,
--
Gitblit v1.10.0