| | |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | |
| | | * {@link DBCursor} implementation that iterates across a Collection of |
| | | * {@link DBCursor}s, advancing from the oldest to the newest change cross all |
| | | * cursors. |
| | | * |
| | | * @param <Data> |
| | | * The type of data associated with each cursor |
| | | */ |
| | | final class CompositeDBCursor implements DBCursor<UpdateMsg> |
| | | final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private UpdateMsg currentChange; |
| | | private final List<DBCursor<UpdateMsg>> exhaustedCursors = |
| | | new ArrayList<DBCursor<UpdateMsg>>(); |
| | | private UpdateMsg currentRecord; |
| | | private Data currentData; |
| | | private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(); |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | | */ |
| | | private final NavigableSet<DBCursor<UpdateMsg>> cursors = |
| | | new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | | public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | }); |
| | | private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | | public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a CompositeDBCursor using the provided collection of cursors. |
| | |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | */ |
| | | public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors) |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : cursors) |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | add(cursor); |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. Copy the List to avoid ConcurrentModificationExceptions. |
| | | final DBCursor<UpdateMsg>[] copy = |
| | | exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]); |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (DBCursor<UpdateMsg> cursor : copy) |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | { |
| | | cursor.next(); |
| | | add(cursor); |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | if (cursors.isEmpty()) |
| | | { |
| | | // no cursors are left with changes. |
| | | currentChange = null; |
| | | currentRecord = null; |
| | | currentData = null; |
| | | return false; |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); |
| | | currentChange = cursor.getRecord(); |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry(); |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | currentRecord = cursor.getRecord(); |
| | | currentData = entry.getValue(); |
| | | cursor.next(); |
| | | add(cursor); |
| | | put(entry); |
| | | return true; |
| | | } |
| | | |
| | | private void add(DBCursor<UpdateMsg> cursor) |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | final Data data = entry.getValue(); |
| | | if (cursor.getRecord() != null) |
| | | { |
| | | this.cursors.add(cursor); |
| | | this.cursors.put(cursor, data); |
| | | } |
| | | else |
| | | { |
| | | this.exhaustedCursors.add(cursor); |
| | | this.exhaustedCursors.put(cursor, data); |
| | | } |
| | | } |
| | | |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentChange; |
| | | return currentRecord; |
| | | } |
| | | |
| | | /** |
| | | * Returns the data associated to the cursor that returned the current record. |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | | */ |
| | | public Data getData() |
| | | { |
| | | return currentData; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | StaticUtils.close(cursors); |
| | | StaticUtils.close(exhaustedCursors); |
| | | StaticUtils.close(cursors.keySet()); |
| | | StaticUtils.close(exhaustedCursors.keySet()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " currentChange=" + currentChange |
| | | + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors; |
| | | return getClass().getSimpleName() + " currentRecord=" + currentRecord |
| | | + " currentData=" + currentData + " openCursors=" + cursors |
| | | + " exhaustedCursors=" + exhaustedCursors; |
| | | } |
| | | |
| | | } |