From 60f8d8d4575206697f47c040d4272dee27251bab Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Apr 2014 13:56:22 +0000
Subject: [PATCH] OPENDJ-1430 Some changes are missing from the external changelog changeNumber
---
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 92 +++++++++++++++++++++++++++++++++-------------
1 files changed, 66 insertions(+), 26 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 46994c5..52ef4c6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -45,8 +45,23 @@
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
- private UpdateMsg currentRecord;
- private Data currentData;
+ private static final byte UNINITIALIZED = 0;
+ private static final byte READY = 1;
+ private static final byte CLOSED = 2;
+
+ /**
+ * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
+ * {@link #CLOSED}
+ */
+ private byte state = UNINITIALIZED;
+
+ /** Whether this composite should try to recycle exhausted cursors. */
+ private final boolean recycleExhaustedCursors;
+ /**
+ * These cursors are considered exhausted because they had no new changes the
+ * last time {@link DBCursor#next()} was called on them. Exhausted cursors
+ * might be recycled at some point when they start returning changes again.
+ */
private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
new HashMap<DBCursor<UpdateMsg>, Data>();
/**
@@ -71,9 +86,14 @@
*
* @param cursors
* the cursors that will be iterated upon.
+ * @param recycleExhaustedCursors
+ * whether a call to {@link #next()} tries to recycle exhausted
+ * cursors
*/
- public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
+ public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
+ boolean recycleExhaustedCursors)
{
+ this.recycleExhaustedCursors = recycleExhaustedCursors;
for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
{
put(entry);
@@ -84,10 +104,16 @@
@Override
public boolean next() throws ChangelogException
{
- if (!exhaustedCursors.isEmpty())
+ if (state == CLOSED)
+ {
+ return false;
+ }
+ final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
+ state = READY;
+ if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
{
// try to recycle empty cursors in case the underlying ReplicaDBs received
- // new changes. Copy the List to avoid ConcurrentModificationExceptions.
+ // new changes.
final Map<DBCursor<UpdateMsg>, Data> copy =
new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
@@ -96,25 +122,30 @@
entry.getKey().next();
put(entry);
}
- }
-
- if (cursors.isEmpty())
- {
- // no cursors are left with changes.
- currentRecord = null;
- currentData = null;
- return false;
+ final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
+ if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
+ {
+ // if the first cursor was previously an exhausted cursor,
+ // then we have already called next() on it.
+ // Avoid calling it again because we know new changes have been found.
+ return true;
+ }
}
// To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and eventually add again a cursor (after moving it forward).
- final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
- final DBCursor<UpdateMsg> cursor = entry.getKey();
- currentRecord = cursor.getRecord();
- currentData = entry.getValue();
- cursor.next();
- put(entry);
- return true;
+ // to remove and add again the cursor after moving it forward.
+ if (advanceNonExhaustedCursors)
+ {
+ Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
+ if (firstEntry != null)
+ {
+ final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
+ cursor.next();
+ put(firstEntry);
+ }
+ }
+ // no cursors are left with changes.
+ return !cursors.isEmpty();
}
private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
@Override
public UpdateMsg getRecord()
{
- return currentRecord;
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+ if (entry != null)
+ {
+ return entry.getKey().getRecord();
+ }
+ return null;
}
/**
@@ -145,7 +181,12 @@
*/
public Data getData()
{
- return currentData;
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+ if (entry != null)
+ {
+ return entry.getValue();
+ }
+ return null;
}
/** {@inheritDoc} */
@@ -160,8 +201,7 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentRecord=" + currentRecord
- + " currentData=" + currentData + " openCursors=" + cursors
+ return getClass().getSimpleName() + " openCursors=" + cursors
+ " exhaustedCursors=" + exhaustedCursors;
}
--
Gitblit v1.10.0