From c5135432faf9bbbcd496ea160d59755fba31012c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 21 Nov 2013 16:17:00 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 92 ++++++++++++++++++++++++++++-----------------
1 files changed, 57 insertions(+), 35 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 7f8e3c3..2e0e7c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.util.*;
+import java.util.Map.Entry;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -38,28 +39,33 @@
* {@link DBCursor} implementation that iterates across a Collection of
* {@link DBCursor}s, advancing from the oldest to the newest change cross all
* cursors.
+ *
+ * @param <Data>
+ * The type of data associated with each cursor
*/
-final class CompositeDBCursor implements DBCursor<UpdateMsg>
+final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
- private UpdateMsg currentChange;
- private final List<DBCursor<UpdateMsg>> exhaustedCursors =
- new ArrayList<DBCursor<UpdateMsg>>();
+ private UpdateMsg currentRecord;
+ private Data currentData;
+ private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
+ new HashMap<DBCursor<UpdateMsg>, Data>();
/**
* The cursors are sorted based on the current change of each cursor to
* consider the next change across all available cursors.
*/
- private final NavigableSet<DBCursor<UpdateMsg>> cursors =
- new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
- {
- @Override
- public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
- {
- final CSN csn1 = o1.getRecord().getCSN();
- final CSN csn2 = o2.getRecord().getCSN();
- return CSN.compare(csn1, csn2);
- }
- });
+ private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+ new TreeMap<DBCursor<UpdateMsg>, Data>(
+ new Comparator<DBCursor<UpdateMsg>>()
+ {
+ @Override
+ public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
+ {
+ final CSN csn1 = o1.getRecord().getCSN();
+ final CSN csn2 = o2.getRecord().getCSN();
+ return CSN.compare(csn1, csn2);
+ }
+ });
/**
* Builds a CompositeDBCursor using the provided collection of cursors.
@@ -67,11 +73,11 @@
* @param cursors
* the cursors that will be iterated upon.
*/
- public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
+ public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
{
- for (DBCursor<UpdateMsg> cursor : cursors)
+ for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
{
- add(cursor);
+ put(entry);
}
}
@@ -83,41 +89,46 @@
{
// try to recycle empty cursors in case the underlying ReplicaDBs received
// new changes. Copy the List to avoid ConcurrentModificationExceptions.
- final DBCursor<UpdateMsg>[] copy =
- exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]);
+ final Map<DBCursor<UpdateMsg>, Data> copy =
+ new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
- for (DBCursor<UpdateMsg> cursor : copy)
+ for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
{
- cursor.next();
- add(cursor);
+ entry.getKey().next();
+ put(entry);
}
}
if (cursors.isEmpty())
{
// no cursors are left with changes.
- currentChange = null;
+ currentRecord = null;
+ currentData = null;
return false;
}
// To keep consistent the cursors' order in the SortedSet, it is necessary
// to remove and eventually add again a cursor (after moving it forward).
- final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
- currentChange = cursor.getRecord();
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
+ final DBCursor<UpdateMsg> cursor = entry.getKey();
+ currentRecord = cursor.getRecord();
+ currentData = entry.getValue();
cursor.next();
- add(cursor);
+ put(entry);
return true;
}
- private void add(DBCursor<UpdateMsg> cursor)
+ private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
{
+ final DBCursor<UpdateMsg> cursor = entry.getKey();
+ final Data data = entry.getValue();
if (cursor.getRecord() != null)
{
- this.cursors.add(cursor);
+ this.cursors.put(cursor, data);
}
else
{
- this.exhaustedCursors.add(cursor);
+ this.exhaustedCursors.put(cursor, data);
}
}
@@ -125,23 +136,34 @@
@Override
public UpdateMsg getRecord()
{
- return currentChange;
+ return currentRecord;
+ }
+
+ /**
+ * Returns the data associated to the cursor that returned the current record.
+ *
+ * @return the data associated to the cursor that returned the current record.
+ */
+ public Data getData()
+ {
+ return currentData;
}
/** {@inheritDoc} */
@Override
public void close()
{
- StaticUtils.close(cursors);
- StaticUtils.close(exhaustedCursors);
+ StaticUtils.close(cursors.keySet());
+ StaticUtils.close(exhaustedCursors.keySet());
}
/** {@inheritDoc} */
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentChange=" + currentChange
- + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
+ return getClass().getSimpleName() + " currentRecord=" + currentRecord
+ + " currentData=" + currentData + " openCursors=" + cursors
+ + " exhaustedCursors=" + exhaustedCursors;
}
}
--
Gitblit v1.10.0