From b6ccb560e9056cc9c028812f5f63ff2e80c95c87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 359 +++++++++++------------------------------------------------
1 files changed, 69 insertions(+), 290 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index deacf2a..68b9688 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,15 +29,8 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.messages.Message;
@@ -47,18 +40,15 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
-import org.opends.server.util.StaticUtils;
-
-import com.forgerock.opendj.util.Pair;
/**
* Thread responsible for inserting replicated changes into the ChangeNumber
@@ -84,7 +74,7 @@
private ChangelogState changelogState;
/*
- * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
+ * The following MultiDomainServerState fields must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
* updates 2) many updates can happen concurrently.
*/
@@ -130,39 +120,7 @@
*
* @NonNull
*/
- @SuppressWarnings("unchecked")
- private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
- new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
-
- /**
- * New cursors for this Map must be created from the {@link #run()} method,
- * i.e. from the same thread that will make use of them. If this rule is not
- * obeyed, then a JE exception will be thrown about
- * "Non-transactional Cursors may not be used in multiple threads;".
- */
- private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
- new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
- /**
- * Holds the newCursors that will have to be created in the next iteration
- * inside the {@link #run()} method.
- * <p>
- * This map can be updated by multiple threads.
- */
- private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
- new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
- new Comparator<Pair<DN, Integer>>()
- {
- @Override
- public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
- {
- final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
- if (compareBaseDN == 0)
- {
- return o1.getSecond().compareTo(o2.getSecond());
- }
- return compareBaseDN;
- }
- });
+ private MultiDomainDBCursor nextChangeForInsertDBCursor;
/**
* Builds a ChangeNumberIndexer object.
@@ -232,11 +190,8 @@
return;
}
- final CSN csn = updateMsg.getCSN();
- // only keep the oldest CSN that will be the new cursor's starting point
- newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
final CSN oldestCSNBefore = getOldestLastAliveCSN();
- lastAliveCSNs.update(baseDN, csn);
+ lastAliveCSNs.update(baseDN, updateMsg.getCSN());
tryNotify(oldestCSNBefore);
}
@@ -381,28 +336,25 @@
for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
- if (!isECLEnabledDomain(baseDN))
+ if (isECLEnabledDomain(baseDN))
{
- continue;
- }
+ for (Integer serverId : entry.getValue())
+ {
+ /*
+ * initialize with the oldest possible CSN in order for medium
+ * consistency to wait for all replicas to be alive before moving
+ * forward
+ */
+ lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
+ }
- for (Integer serverId : entry.getValue())
- {
- /*
- * initialize with the oldest possible CSN in order for medium
- * consistency to wait for all replicas to be alive before moving
- * forward
- */
- lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
- // start after the actual CSN when initializing from the previous cookie
- final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
- ensureCursorExists(baseDN, serverId, csn);
+ ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+ lastAliveCSNs.update(baseDN, latestKnownState);
}
-
- ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
- lastAliveCSNs.update(baseDN, latestKnownState);
}
- resetNextChangeForInsertDBCursor();
+
+ nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+ nextChangeForInsertDBCursor.next();
if (newestRecord != null)
{
@@ -445,68 +397,6 @@
return new CSN(0, 0, serverId);
}
- private void resetNextChangeForInsertDBCursor() throws ChangelogException
- {
- final Map<DBCursor<UpdateMsg>, DN> cursors =
- new HashMap<DBCursor<UpdateMsg>, DN>();
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
- : this.allCursors.entrySet())
- {
- for (Entry<Integer, DBCursor<UpdateMsg>> entry2
- : entry.getValue().entrySet())
- {
- cursors.put(entry2.getValue(), entry.getKey());
- }
- }
-
- // CNIndexer manages the cursor itself,
- // so do not try to recycle exhausted cursors
- CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
- result.next();
- nextChangeForInsertDBCursor = result;
- }
-
- private boolean ensureCursorExists(DN baseDN, Integer serverId,
- CSN startAfterCSN) throws ChangelogException
- {
- Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
- if (map == null)
- {
- map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
- allCursors.put(baseDN, map);
- }
- DBCursor<UpdateMsg> cursor = map.get(serverId);
- if (cursor == null)
- {
- final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
- cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
- cursor.next();
- map.put(serverId, cursor);
- return false;
- }
- return true;
- }
-
- /**
- * Returns the immediately preceding CSN.
- *
- * @param csn
- * the CSN to use
- * @return the immediately preceding CSN or null if the provided CSN is null.
- */
- CSN getPrecedingCSN(CSN csn)
- {
- if (csn == null)
- {
- return null;
- }
- if (csn.getSeqnum() > 0)
- {
- return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
- }
- return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
- }
-
/** {@inheritDoc} */
@Override
public void initiateShutdown()
@@ -535,28 +425,18 @@
{
try
{
- if (!domainsToClear.isEmpty())
+ while (!domainsToClear.isEmpty())
{
- while (!domainsToClear.isEmpty())
- {
- final DN baseDNToClear = domainsToClear.first();
- removeCursors(baseDNToClear);
- // Only release the waiting thread
- // once this domain's state has been cleared.
- domainsToClear.remove(baseDNToClear);
- }
- resetNextChangeForInsertDBCursor();
- }
- else
- {
- final boolean createdCursors = createNewCursors();
- final boolean recycledCursors = recycleExhaustedCursors();
- if (createdCursors || recycledCursors)
- {
- resetNextChangeForInsertDBCursor();
- }
+ final DN baseDNToClear = domainsToClear.first();
+ nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+ // Only release the waiting thread
+ // once this domain's state has been cleared.
+ domainsToClear.remove(baseDNToClear);
}
+ // Do not call DBCursor.next() here
+ // because we might not have consumed the last record,
+ // for example if we could not move the MCP forward
final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
if (msg == null)
{
@@ -568,8 +448,13 @@
}
wait();
}
- // loop to check whether new changes have been added to the
- // ReplicaDBs
+ // check whether new changes have been added to the ReplicaDBs
+ nextChangeForInsertDBCursor.next();
+ continue;
+ }
+ else if (msg instanceof ReplicaOfflineMsg)
+ {
+ nextChangeForInsertDBCursor.next();
continue;
}
@@ -615,39 +500,44 @@
}
catch (RuntimeException e)
{
- // Nothing can be done about it.
- // Rely on the DirectoryThread uncaught exceptions handler
- // for logging error + alert.
- // Message logged here gives corrective information to the administrator.
- Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
- getClass().getSimpleName(), stackTraceToSingleLineString(e));
- TRACER.debugError(msg.toString());
+ logUnexpectedException(e);
+ // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
throw e;
}
catch (Exception e)
{
- // Nothing can be done about it.
- // Rely on the DirectoryThread uncaught exceptions handler
- // for logging error + alert.
- // Message logged here gives corrective information to the administrator.
- Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
- getClass().getSimpleName(), stackTraceToSingleLineString(e));
- TRACER.debugError(msg.toString());
+ logUnexpectedException(e);
+ // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
throw new RuntimeException(e);
}
finally
{
- removeCursors(DN.NULL_DN);
+ nextChangeForInsertDBCursor.close();
+ nextChangeForInsertDBCursor = null;
}
}
+ /**
+ * Nothing can be done about it.
+ * <p>
+ * Rely on the DirectoryThread uncaught exceptions handler for logging error +
+ * alert.
+ * <p>
+ * Message logged here gives corrective information to the administrator.
+ */
+ private void logUnexpectedException(Exception e)
+ {
+ Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+ getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ TRACER.debugError(msg.toString());
+ }
+
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
- boolean callNextOnCursor = true;
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -660,133 +550,22 @@
}
else if (offlineCSN.isOlderThan(mcCSN))
{
- Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
- pair = getCursor(mcBaseDN, mcCSN.getServerId());
- Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
- if (iter != null && !iter.hasNext())
- {
- /*
- * replica is not back online, Medium consistency point has gone past
- * its last offline time, and there are no more changes after the
- * offline CSN in the cursor: remove everything known about it:
- * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
- * this replica from the medium consistency RUV.
- */
- iter.remove();
- StaticUtils.close(pair.getFirst());
- resetNextChangeForInsertDBCursor();
- callNextOnCursor = false;
- lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
- }
+ /*
+ * replica is not back online, Medium consistency point has gone past
+ * its last offline time, and there are no more changes after the
+ * offline CSN in the cursor: remove everything known about it:
+ * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+ * this replica from the medium consistency RUV.
+ */
+ // TODO JNR how to close cursor for offline replica?
+ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
- if (callNextOnCursor)
- {
- // advance the cursor we just read from,
- // success/failure will be checked later
- nextChangeForInsertDBCursor.next();
- }
- }
-
- private void removeCursors(DN baseDN)
- {
- if (nextChangeForInsertDBCursor != null)
- {
- nextChangeForInsertDBCursor.close();
- nextChangeForInsertDBCursor = null;
- }
- if (DN.NULL_DN.equals(baseDN))
- {
- // close all cursors
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
- {
- StaticUtils.close(map.values());
- }
- allCursors.clear();
- newCursors.clear();
- }
- else
- {
- // close cursors for this DN
- final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
- if (map != null)
- {
- StaticUtils.close(map.values());
- }
- for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
- {
- if (it.next().getFirst().equals(baseDN))
- {
- it.remove();
- }
- }
- }
- }
-
- private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
- getCursor(final DN baseDN, final int serverId) throws ChangelogException
- {
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
- : allCursors.entrySet())
- {
- if (baseDN.equals(entry1.getKey()))
- {
- for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
- entry1.getValue().entrySet().iterator(); iter.hasNext();)
- {
- final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
- if (serverId == entry2.getKey())
- {
- return Pair.of(entry2.getValue(), iter);
- }
- }
- }
- }
- return Pair.empty();
- }
-
- private boolean recycleExhaustedCursors() throws ChangelogException
- {
- boolean succesfullyRecycled = false;
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
- {
- for (DBCursor<UpdateMsg> cursor : map.values())
- {
- // try to recycle it by calling next()
- if (cursor.getRecord() == null && cursor.next())
- {
- succesfullyRecycled = true;
- }
- }
- }
- return succesfullyRecycled;
- }
-
- private boolean createNewCursors() throws ChangelogException
- {
- if (!newCursors.isEmpty())
- {
- boolean newCursorAdded = false;
- for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
- newCursors.entrySet().iterator(); iter.hasNext();)
- {
- final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
- final DN baseDN = entry.getKey().getFirst();
- final CSN csn = entry.getValue();
- // start after preceding CSN so the first CSN read will exactly be the
- // current one
- final CSN startFromCSN = getPrecedingCSN(csn);
- if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
- {
- newCursorAdded = true;
- }
- iter.remove();
- }
- return newCursorAdded;
- }
- return false;
+ // advance the cursor we just read from,
+ // success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
}
/**
--
Gitblit v1.10.0