From dc02a21390ac3b24e2eaa2505c823a33fd3eee07 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 23 Jun 2015 08:06:48 +0000
Subject: [PATCH] OPENDJ-2141 (CR-7337) Cannot find entry in cn=changelog when searching with changelogCookie filter
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java | 40 +++-----
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java | 66 ++++--------
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 6
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 5
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java | 51 ----------
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 7
opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java | 60 ++++++++---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 7
8 files changed, 92 insertions(+), 150 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
index 1753112..48ce4c1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
@@ -1077,7 +1077,7 @@
continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
if (continueSearch)
{
- final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
+ final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN());
if (updateMsg != null)
{
continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
@@ -1092,20 +1092,48 @@
private void initializeCookieForChangeNumberMode(
MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
{
- ECLMultiDomainDBCursor eclCursor = null;
- try
+ // Initialize the multi domain cursor only from the change number index record.
+ // The cookie is always empty at this stage.
+ CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN());
+ MultiDomainServerState unused = new MultiDomainServerState();
+ MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options);
+ try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
{
- cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
- CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
- MultiDomainDBCursor cursor =
- getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, options);
- eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
- eclCursor.next();
- cookie.update(eclCursor.toCookie());
+ updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord);
}
- finally
+ }
+
+ /**
+ * Rebuilds the changelogcookie starting at the newest change number index record.
+ * <p>
+ * It updates the provided cookie with the changes from the provided ECL cursor,
+ * up to (and including) the provided change number index record.
+ * <p>
+ * Therefore, after calling this method, the cursor is positioned
+ * to the change immediately following the provided change number index record.
+ *
+ * @param cookie the cookie to update
+ * @param cursor the cursor where to read changes from
+ * @param cnIndexRecord the change number index record to go right after
+ * @throws ChangelogException if any problem occurs
+ */
+ public static void updateCookieToMediumConsistencyPoint(
+ MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord)
+ throws ChangelogException
+ {
+ if (cnIndexRecord == null)
{
- close(eclCursor);
+ return;
+ }
+
+ while (cursor.next())
+ {
+ UpdateMsg updateMsg = cursor.getRecord();
+ if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0)
+ {
+ break;
+ }
+ cookie.update(cursor.getData(), updateMsg.getCSN());
}
}
@@ -1135,15 +1163,13 @@
* If inconsistency is detected between the available update
* messages and the provided cnIndexRecord
*/
- private UpdateMsg findReplicaUpdateMessage(
- final ChangeNumberIndexRecord cnIndexRecord,
- final MultiDomainDBCursor replicaUpdatesCursor)
- throws DirectoryException, ChangelogException
+ private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn)
+ throws ChangelogException, DirectoryException
{
while (true)
{
final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
- final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
+ final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN());
if (compareIndexWithUpdateMsg < 0) {
// Either update message has been purged or baseDN has been removed from changelogDB,
// ignore current index record and go to the next one
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
index 0414235..03e3484 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -71,8 +71,7 @@
* for the supplied domain baseDNs. If a supplied domain is
* {@link DN#NULL_DN}, then all domains will be cleared.
*/
- private final ConcurrentSkipListSet<DN> domainsToClear =
- new ConcurrentSkipListSet<DN>();
+ private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
private final ChangelogDB changelogDB;
/** Only used for initialization, and then discarded. */
private ChangelogState changelogState;
@@ -108,6 +107,7 @@
* @NonNull
*/
private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
+ private MultiDomainServerState cookie = new MultiDomainServerState();
/**
* Builds a ChangeNumberIndexer object.
@@ -317,30 +317,16 @@
private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
{
- final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
-
- CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
- MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
- domainDB.getCursorFrom(cookieWithNewestCSN, options);
+ // Initialize the multi domain cursor only from the change number index record.
+ // The cookie is always empty at this stage.
+ final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
+ final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null;
+ final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn);
+ final MultiDomainServerState unused = new MultiDomainServerState();
+ MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options);
nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
- nextChangeForInsertDBCursor.next();
- }
-
- /** Returns a cookie initialised with the newest CSN for each replica. */
- private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
- {
- final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
- final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
- if (newestRecord != null)
- {
- final CSN newestCsn = newestRecord.getCSN();
- for (DN baseDN : changelogState.getDomainToServerIds().keySet())
- {
- cookieWithNewestCSN.update(baseDN, newestCsn);
- }
- }
- return cookieWithNewestCSN;
+ ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord);
}
private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
@@ -477,7 +463,11 @@
// let's publish it to the CNIndexDB.
final long changeNumber = changelogDB.getChangeNumberIndexDB()
.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
- MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
+ if (!cookie.update(baseDN, csn))
+ {
+ throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn
+ + ") would have updated the cookie=" + cookie + ", but it did not");
+ }
notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
moveForwardMediumConsistencyPoint(csn, baseDN);
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
index 15b5a7d..17aab83 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
@@ -25,10 +25,13 @@
*/
package org.opends.server.replication.server.changelog.file;
-import java.util.*;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
-import org.forgerock.util.Pair;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -46,15 +49,11 @@
*/
abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg>
{
-
private static final byte UNINITIALIZED = 0;
private static final byte READY = 1;
private static final byte CLOSED = 2;
- /**
- * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
- * {@link #CLOSED}
- */
+ /** The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED} */
private byte state = UNINITIALIZED;
/**
@@ -62,8 +61,7 @@
* last time {@link DBCursor#next()} was called on them. Exhausted cursors
* might be recycled at some point when they start returning changes again.
*/
- private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors =
- new HashMap<DBCursor<UpdateMsg>, T>();
+ private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = new HashMap<>();
/**
* The cursors are sorted based on the current change of each cursor to
* consider the next change across all available cursors.
@@ -73,8 +71,7 @@
* thrown about
* "Non-transactional Cursors may not be used in multiple threads;".
*/
- private final TreeMap<DBCursor<UpdateMsg>, T> cursors =
- new TreeMap<DBCursor<UpdateMsg>, T>(
+ private final TreeMap<DBCursor<UpdateMsg>, T> cursors = new TreeMap<>(
new Comparator<DBCursor<UpdateMsg>>()
{
@Override
@@ -82,7 +79,20 @@
{
final CSN csn1 = o1.getRecord().getCSN();
final CSN csn2 = o2.getRecord().getCSN();
- return CSN.compare(csn1, csn2);
+ int cmpCsn = CSN.compare(csn1, csn2);
+ if (cmpCsn == 0
+ && o1 instanceof CompositeDBCursor
+ && o2 instanceof CompositeDBCursor)
+ {
+ // Ensures a consistent order when the CSNs are equal (rare in practice)
+ T data1 = ((CompositeDBCursor<T>) o1).getData();
+ T data2 = ((CompositeDBCursor<T>) o1).getData();
+ if (data1 instanceof Comparable && data2 instanceof Comparable)
+ {
+ return ((Comparable<T>) data1).compareTo(data2);
+ }
+ }
+ return cmpCsn;
}
});
@@ -215,36 +225,6 @@
return null;
}
- /**
- * Returns a snapshot of this cursor.
- *
- * @return a list of (Data, UpdateMsg) pairs representing the state of the
- * cursor. In each pair, the data or the update message may be
- * {@code null}, but at least one of them is non-null.
- */
- public List<Pair<T, UpdateMsg>> getSnapshot()
- {
- final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>();
- for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet())
- {
- final UpdateMsg updateMsg = entry.getKey().getRecord();
- final T data = entry.getValue();
- if (updateMsg != null || data != null)
- {
- snapshot.add(Pair.of(data, updateMsg));
- }
- }
- for (T data : exhaustedCursors.values())
- {
- if (data != null)
- {
- snapshot.add(Pair.of(data, (UpdateMsg) null));
- }
- }
- return snapshot;
- }
-
- /** {@inheritDoc} */
@Override
public void close()
{
@@ -255,12 +235,10 @@
exhaustedCursors.clear();
}
- /** {@inheritDoc} */
@Override
public String toString()
{
return getClass().getSimpleName() + " openCursors=" + cursors
+ " exhaustedCursors=" + exhaustedCursors;
}
-
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
index 19e1ab5..193eb45 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -24,11 +24,6 @@
*/
package org.opends.server.replication.server.changelog.file;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.forgerock.util.Pair;
-import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -40,7 +35,6 @@
*/
public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
{
-
private final ECLEnabledDomainPredicate predicate;
private final MultiDomainDBCursor cursor;
@@ -88,7 +82,6 @@
cursor.removeDomain(baseDN);
}
- /** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
@@ -102,59 +95,15 @@
return hasNext;
}
- /** {@inheritDoc} */
@Override
public void close()
{
cursor.close();
}
- /** {@inheritDoc} */
@Override
public String toString()
{
return getClass().getSimpleName() + " cursor=[" + cursor + ']';
}
-
- /**
- * Returns a snapshot of this cursor.
- *
- * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled
- * for the external changelog. The update message may be {@code null}.
- */
- List<Pair<DN, UpdateMsg>> getSnapshot()
- {
- final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot();
- final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>();
- for (Pair<DN, UpdateMsg> pair : snapshot)
- {
- DN baseDN = pair.getFirst();
- if (predicate.isECLEnabledDomain(baseDN))
- {
- eclSnapshot.add(pair);
- }
- }
- return eclSnapshot;
- }
-
- /**
- * Returns the cookie corresponding to the state of this cursor.
- *
- * @return a valid cookie taking into account only the base DNs enabled for
- * the external changelog
- */
- public MultiDomainServerState toCookie()
- {
- List<Pair<DN, UpdateMsg>> snapshot = getSnapshot();
- MultiDomainServerState cookie = new MultiDomainServerState();
- for (Pair<DN, UpdateMsg> pair : snapshot)
- {
- // only put base DNs where a CSN is available in the cookie
- if (pair.getSecond() != null)
- {
- cookie.update(pair.getFirst(), pair.getSecond().getCSN());
- }
- }
- return cookie;
- }
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index dadbc25..b3f70d9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -729,9 +729,10 @@
final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
+ final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
- startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
- final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
+ actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 8c8a85d..3698bb0 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -221,12 +221,11 @@
final PositionStrategy positionStrategy) throws ChangelogException
{
RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy);
- return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
+ CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
+ return new FileReplicaDBCursor(cursor, actualStartCSN, positionStrategy);
}
- /**
- * Shutdown this ReplicaDB.
- */
+ /** Shutdown this ReplicaDB. */
void shutdown()
{
if (shutdown.compareAndSet(false, true))
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index e0634f9..ec7f3c1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -764,7 +764,6 @@
return null;
}
- /** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
CursorOptions options) throws ChangelogException
@@ -772,9 +771,10 @@
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
+ final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
- startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
- final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
+ actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index f032d77..401531f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -195,12 +195,11 @@
DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
final PositionStrategy positionStrategy) throws ChangelogException
{
- return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this);
+ CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
+ return new JEReplicaDBCursor(db, actualStartCSN, matchingStrategy, positionStrategy, this);
}
- /**
- * Shutdown this ReplicaDB.
- */
+ /** Shutdown this ReplicaDB. */
void shutdown()
{
if (shutdown.compareAndSet(false, true))
--
Gitblit v1.10.0