From b6ccb560e9056cc9c028812f5f63ff2e80c95c87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 154 ++++++++++++++++++++++++++++++++-------------------
1 files changed, 97 insertions(+), 57 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index ccd27c0..7f271c1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
* @param <Data>
* The type of data associated with each cursor
*/
-public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
*/
private byte state = UNINITIALIZED;
- /** Whether this composite should try to recycle exhausted cursors. */
- private final boolean recycleExhaustedCursors;
/**
* These cursors are considered exhausted because they had no new changes the
* last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
/**
* The cursors are sorted based on the current change of each cursor to
* consider the next change across all available cursors.
+ * <p>
+ * New cursors for this Map must be created from the same thread that will
+ * make use of them. When this rule is not obeyed, a JE exception will be
+ * thrown about
+ * "Non-transactional Cursors may not be used in multiple threads;".
*/
- private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+ private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
new TreeMap<DBCursor<UpdateMsg>, Data>(
new Comparator<DBCursor<UpdateMsg>>()
{
@@ -81,25 +84,6 @@
}
});
- /**
- * Builds a CompositeDBCursor using the provided collection of cursors.
- *
- * @param cursors
- * the cursors that will be iterated upon.
- * @param recycleExhaustedCursors
- * whether a call to {@link #next()} tries to recycle exhausted
- * cursors
- */
- public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
- boolean recycleExhaustedCursors)
- {
- this.recycleExhaustedCursors = recycleExhaustedCursors;
- for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
- {
- put(entry);
- }
- }
-
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
@@ -108,51 +92,75 @@
{
return false;
}
- final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
- state = READY;
- if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
+
+ if (state == UNINITIALIZED)
{
- // try to recycle empty cursors in case the underlying ReplicaDBs received
- // new changes.
+ state = READY;
+ }
+ else
+ {
+ // Previous state was READY => we must advance the first cursor
+ // because the UpdateMsg it is pointing has already been consumed.
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove the first cursor, then add it again after moving it forward.
+ final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry();
+ if (cursorToAdvance != null)
+ {
+ addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
+ }
+ }
+
+ recycleExhaustedCursors();
+ removeNoLongerNeededCursors();
+ incorporateNewCursors();
+ return !cursors.isEmpty();
+ }
+
+ private void recycleExhaustedCursors() throws ChangelogException
+ {
+ if (!exhaustedCursors.isEmpty())
+ {
+ // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
final Map<DBCursor<UpdateMsg>, Data> copy =
new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
{
- entry.getKey().next();
- put(entry);
- }
- final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
- if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
- {
- // if the first cursor was previously an exhausted cursor,
- // then we have already called next() on it.
- // Avoid calling it again because we know new changes have been found.
- return true;
+ addCursor(entry.getKey(), entry.getValue());
}
}
-
- // To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and add again the cursor after moving it forward.
- if (advanceNonExhaustedCursors)
- {
- Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
- if (firstEntry != null)
- {
- final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
- cursor.next();
- put(firstEntry);
- }
- }
- // no cursors are left with changes.
- return !cursors.isEmpty();
}
- private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
+ private void removeNoLongerNeededCursors()
{
- final DBCursor<UpdateMsg> cursor = entry.getKey();
- final Data data = entry.getValue();
- if (cursor.getRecord() != null)
+ for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
+ cursors.entrySet().iterator(); iterator.hasNext();)
+ {
+ final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
+ final Data data = entry.getValue();
+ if (isCursorNoLongerNeededFor(data))
+ {
+ entry.getKey().close();
+ iterator.remove();
+ cursorRemoved(data);
+ }
+ }
+ }
+
+ /**
+ * Adds a cursor to this composite cursor. It first calls
+ * {@link DBCursor#next()} to verify whether it is exhausted or not.
+ *
+ * @param cursor
+ * the cursor to add to this composite
+ * @param data
+ * the data associated to the provided cursor
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
+ {
+ if (cursor.next())
{
this.cursors.put(cursor, data);
}
@@ -166,6 +174,8 @@
@Override
public UpdateMsg getRecord()
{
+ // Cannot call incorporateNewCursors() here because
+ // somebody might have already called DBCursor.getRecord() and read the record
final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
if (entry != null)
{
@@ -175,6 +185,33 @@
}
/**
+ * Called when implementors should incorporate new cursors into the current
+ * composite DBCursor. Implementors should call
+ * {@link #addCursor(DBCursor, Object)} to do so.
+ *
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ protected abstract void incorporateNewCursors() throws ChangelogException;
+
+ /**
+ * Returns whether the cursor associated to the provided data should be removed.
+ *
+ * @param data the data associated to the cursor to be tested
+ * @return true if the cursor associated to the provided data should be removed,
+ * false otherwise
+ */
+ protected abstract boolean isCursorNoLongerNeededFor(Data data);
+
+ /**
+ * Notifies that the cursor associated to the provided data has been removed.
+ *
+ * @param data
+ * the data associated to the removed cursor
+ */
+ protected abstract void cursorRemoved(Data data);
+
+ /**
* Returns the data associated to the cursor that returned the current record.
*
* @return the data associated to the cursor that returned the current record.
@@ -193,8 +230,11 @@
@Override
public void close()
{
+ state = CLOSED;
StaticUtils.close(cursors.keySet());
StaticUtils.close(exhaustedCursors.keySet());
+ cursors.clear();
+ exhaustedCursors.clear();
}
/** {@inheritDoc} */
--
Gitblit v1.10.0