From 891159050af4aa3fe47c67e3ba7d3f21299027a4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 02 Dec 2013 14:01:32 +0000
Subject: [PATCH] OPENDJ-1174 (CR-2631) Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 292 ++++++++++++++++++++++++++++++++++++++-------------------
1 files changed, 193 insertions(+), 99 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 93b8cd3..ce0f7b4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -30,6 +30,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
@@ -43,6 +44,9 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -57,6 +61,11 @@
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
+ /**
+ * If this is true, then the {@link #run()} method must clear its state.
+ * Otherwise the run method executes normally.
+ */
+ private final AtomicBoolean doClear = new AtomicBoolean();
private final ChangelogDB changelogDB;
/** Only used for initialization, and then discarded. */
private ChangelogState changelogState;
@@ -101,12 +110,12 @@
new MultiDomainServerState();
/**
- * Composite cursor across all the replicaDBs for all the replication domains.
- * It is volatile to ensure it supports concurrent update. Each time it is
- * used more than once in a method, the method must take a local copy to
- * ensure the cursor does not get updated in the middle of the method.
+ * Cursor across all the replicaDBs for all the replication domains. It is
+ * positioned on the next change that needs to be inserted in the CNIndexDB.
+ * <p>
+ * Note: it is only accessed from the {@link #run()} method.
*/
- private volatile CompositeDBCursor<DN> crossDomainDBCursor;
+ private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
/**
* New cursors for this Map must be created from the {@link #run()} method,
@@ -116,9 +125,27 @@
*/
private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
- /** This map can be updated by multiple threads. */
- private ConcurrentMap<CSN, DN> newCursors =
- new ConcurrentSkipListMap<CSN, DN>();
+ /**
+ * Holds the newCursors that will have to be created in the next iteration
+ * inside the {@link #run()} method.
+ * <p>
+ * This map can be updated by multiple threads.
+ */
+ private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
+ new Comparator<Pair<DN, Integer>>()
+ {
+ @Override
+ public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
+ {
+ final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
+ if (compareBaseDN == 0)
+ {
+ return o1.getSecond().compareTo(o2.getSecond());
+ }
+ return compareBaseDN;
+ }
+ });
/**
* Builds a ChangeNumberIndexer object.
@@ -164,7 +191,8 @@
{
final CSN csn = updateMsg.getCSN();
lastSeenUpdates.update(baseDN, csn);
- newCursors.put(csn, baseDN);
+ // only keep the oldest CSN that will be the new cursor's starting point
+ newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
tryNotify(baseDN);
}
@@ -210,17 +238,23 @@
return true;
}
+ /**
+ * Restores in memory data needed to build the CNIndexDB, including the medium
+ * consistency point.
+ */
private void initialize() throws ChangelogException, DirectoryException
{
final ChangeNumberIndexRecord newestRecord =
changelogDB.getChangeNumberIndexDB().getNewestRecord();
if (newestRecord != null)
{
+ // restore the mediumConsistencyRUV from DB
mediumConsistencyRUV.update(
new MultiDomainServerState(newestRecord.getPreviousCookie()));
}
- // initialize the cross domain DB cursor
+ // initialize the DB cursor and the last seen updates
+ // to ensure the medium consistency CSN can move forward
final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
for (Entry<DN, List<Integer>> entry
: changelogState.getDomainToServerIds().entrySet())
@@ -235,12 +269,12 @@
ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
lastSeenUpdates.update(baseDN, latestKnownState);
}
+ resetNextChangeForInsertDBCursor();
- crossDomainDBCursor = newCompositeDBCursor();
if (newestRecord != null)
{
// restore the "previousCookie" state before shutdown
- final UpdateMsg record = crossDomainDBCursor.getRecord();
+ final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
if (!record.getCSN().equals(newestRecord.getCSN()))
{
// TODO JNR i18n safety check, should never happen
@@ -248,14 +282,14 @@
+ record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
}
mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
- crossDomainDBCursor.next();
+ nextChangeForInsertDBCursor.next();
}
// this will not be used any more. Discard for garbage collection.
this.changelogState = null;
}
- private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException
+ private void resetNextChangeForInsertDBCursor() throws ChangelogException
{
final Map<DBCursor<UpdateMsg>, DN> cursors =
new HashMap<DBCursor<UpdateMsg>, DN>();
@@ -270,7 +304,7 @@
}
final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
result.next();
- return result;
+ nextChangeForInsertDBCursor = result;
}
private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn)
@@ -286,13 +320,27 @@
if (cursor == null)
{
final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
- cursor = domainDB.getCursorFrom(baseDN, serverId, csn);
+ // use an older CSN because getCursorFrom() starts after the given CSN
+ final CSN anOlderCSN = getPrecedingCSN(csn);
+ cursor = domainDB.getCursorFrom(baseDN, serverId, anOlderCSN);
map.put(serverId, cursor);
return false;
}
return true;
}
+ /**
+ * Returns the immediately preceding CSN.
+ */
+ private CSN getPrecedingCSN(CSN csn)
+ {
+ if (csn.getSeqnum() > 0)
+ {
+ return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
+ }
+ return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
+ }
+
/** {@inheritDoc} */
@Override
public void run()
@@ -305,83 +353,96 @@
* used.
*/
initialize();
- }
- catch (DirectoryException e)
- {
- // TODO JNR error message i18n
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- return;
+
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ if (doClear.get())
+ {
+ removeAllCursors();
+ // No need to use CAS here because it is only for unit tests and at
+ // this point all will have been cleaned up anyway.
+ doClear.set(false);
+ }
+ else
+ {
+ createNewCursors();
+ }
+
+ final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
+ if (msg == null)
+ {
+ synchronized (this)
+ {
+ wait();
+ }
+ // advance cursor, success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
+ // loop to check whether new changes have been added to the
+ // ReplicaDBs
+ continue;
+ }
+
+ final CSN csn = msg.getCSN();
+ final DN baseDN = nextChangeForInsertDBCursor.getData();
+ // FIXME problem: what if the serverId is not part of the ServerState?
+ // right now, thread will be blocked
+ if (!canMoveForwardMediumConsistencyPoint(baseDN))
+ {
+ // the oldest record to insert is newer than the medium consistency
+ // point. Let's wait for a change that can be published.
+ synchronized (this)
+ {
+ // double check to protect against a missed call to notify()
+ if (!canMoveForwardMediumConsistencyPoint(baseDN))
+ {
+ wait();
+ // loop to check if changes older than the medium consistency
+ // point have been added to the ReplicaDBs
+ continue;
+ }
+ }
+ }
+
+
+ // OK, the oldest change is older than the medium consistency point
+ // let's publish it to the CNIndexDB.
+
+ // Next if statement is ugly but ensures the first change will not be
+ // immediately trimmed from the CNIndexDB. Yuck!
+ if (mediumConsistencyRUV.isEmpty())
+ {
+ mediumConsistencyRUV.replace(baseDN, new ServerState());
+ }
+ final String previousCookie = mediumConsistencyRUV.toString();
+ final ChangeNumberIndexRecord record =
+ new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
+ changelogDB.getChangeNumberIndexDB().addRecord(record);
+ moveForwardMediumConsistencyPoint(csn, baseDN);
+
+ // advance cursor, success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
+ }
+ catch (InterruptedException ignored)
+ {
+ // was shutdown called? loop to figure it out.
+ Thread.currentThread().interrupt();
+ }
+ }
+ removeAllCursors();
}
catch (ChangelogException e)
{
- // TODO JNR error message i18n
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- return;
+ // TODO JNR error message i18n
}
-
- while (!isShutdownInitiated())
+ catch (DirectoryException e)
{
- try
- {
- createNewCursors();
-
- final UpdateMsg msg = crossDomainDBCursor.getRecord();
- if (msg == null)
- {
- synchronized (this)
- {
- wait();
- }
- // advance cursor, success/failure will be checked later
- crossDomainDBCursor.next();
- // loop to check whether new changes have been added to the ReplicaDBs
- continue;
- }
-
- final CSN csn = msg.getCSN();
- final DN baseDN = crossDomainDBCursor.getData();
- // FIXME problem: what if the serverId is not part of the ServerState?
- // right now, thread will be blocked
- if (!canMoveForwardMediumConsistencyPoint(baseDN))
- {
- // the oldest record to insert is newer than the medium consistency
- // point. Let's wait for a change that can be published.
- synchronized (this)
- {
- // double check to protect against a missed call to notify()
- if (!canMoveForwardMediumConsistencyPoint(baseDN))
- {
- wait();
- // loop to check if changes older than the medium consistency
- // point have been added to the ReplicaDBs
- continue;
- }
- }
- }
-
- // OK, the oldest change is older than the medium consistency point
- // let's publish it to the CNIndexDB
- final String previousCookie = mediumConsistencyRUV.toString();
- final ChangeNumberIndexRecord record =
- new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
- changelogDB.getChangeNumberIndexDB().addRecord(record);
- moveForwardMediumConsistencyPoint(csn, baseDN);
-
- // advance cursor, success/failure will be checked later
- crossDomainDBCursor.next();
- }
- catch (ChangelogException e)
- {
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- // TODO JNR error message i18n
- }
- catch (InterruptedException ignored)
- {
- // was shutdown called?
- }
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ // TODO JNR error message i18n
}
}
@@ -402,20 +463,32 @@
}
}
+ private void removeAllCursors() throws ChangelogException
+ {
+ for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+ {
+ StaticUtils.close(map.values());
+ }
+ allCursors.clear();
+ newCursors.clear();
+ resetNextChangeForInsertDBCursor();
+ }
+
private void removeCursor(final DN baseDN, final CSN csn)
{
- for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
- .entrySet())
+ for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
+ : allCursors.entrySet())
{
- if (baseDN.equals(entry.getKey()))
+ if (baseDN.equals(entry1.getKey()))
{
- final Set<Integer> serverIds = entry.getValue().keySet();
- for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
+ for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
+ entry1.getValue().entrySet().iterator(); iter.hasNext();)
{
- final int serverId = iter.next();
- if (csn.getServerId() == serverId)
+ final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
+ if (csn.getServerId() == entry2.getKey())
{
iter.remove();
+ StaticUtils.close(entry2.getValue());
return;
}
}
@@ -428,12 +501,13 @@
if (!newCursors.isEmpty())
{
boolean newCursorAdded = false;
- for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
- iter.hasNext();)
+ for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
+ newCursors.entrySet().iterator(); iter.hasNext();)
{
- final Entry<CSN, DN> entry = iter.next();
- final CSN csn = entry.getKey();
- if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
+ final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
+ final DN baseDN = entry.getKey().getFirst();
+ final CSN csn = entry.getValue();
+ if (!ensureCursorExists(baseDN, csn.getServerId(), csn))
{
newCursorAdded = true;
}
@@ -441,9 +515,29 @@
}
if (newCursorAdded)
{
- crossDomainDBCursor = newCompositeDBCursor();
+ resetNextChangeForInsertDBCursor();
}
}
}
+ /**
+ * Asks the current thread to clear its state.
+ * <p>
+ * This method is only useful for unit tests.
+ */
+ public void clear()
+ {
+ doClear.set(true);
+ synchronized (this)
+ {
+ notify();
+ }
+ while (doClear.get())
+ {
+ // wait until clear() has been done by thread
+ // ensures unit tests wait that this thread's state is cleaned up
+ Thread.yield();
+ }
+ }
+
}
--
Gitblit v1.10.0