| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private UpdateMsg currentRecord; |
| | | private Data currentData; |
| | | private static final byte UNINITIALIZED = 0; |
| | | private static final byte READY = 1; |
| | | private static final byte CLOSED = 2; |
| | | |
| | | /** |
| | | * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or |
| | | * {@link #CLOSED} |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** Whether this composite should try to recycle exhausted cursors. */ |
| | | private final boolean recycleExhaustedCursors; |
| | | /** |
| | | * These cursors are considered exhausted because they had no new changes the |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | | * might be recycled at some point when they start returning changes again. |
| | | */ |
| | | private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(); |
| | | /** |
| | |
| | | * |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | * @param recycleExhaustedCursors |
| | | * whether a call to {@link #next()} tries to recycle exhausted |
| | | * cursors |
| | | */ |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors) |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, |
| | | boolean recycleExhaustedCursors) |
| | | { |
| | | this.recycleExhaustedCursors = recycleExhaustedCursors; |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | put(entry); |
| | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (!exhaustedCursors.isEmpty()) |
| | | if (state == CLOSED) |
| | | { |
| | | return false; |
| | | } |
| | | final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; |
| | | state = READY; |
| | | if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. Copy the List to avoid ConcurrentModificationExceptions. |
| | | // new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | if (cursors.isEmpty()) |
| | | { |
| | | // no cursors are left with changes. |
| | | currentRecord = null; |
| | | currentData = null; |
| | | return false; |
| | | final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); |
| | | if (firstEntry != null && copy.containsKey(firstEntry.getKey())) |
| | | { |
| | | // if the first cursor was previously an exhausted cursor, |
| | | // then we have already called next() on it. |
| | | // Avoid calling it again because we know new changes have been found. |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry(); |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | currentRecord = cursor.getRecord(); |
| | | currentData = entry.getValue(); |
| | | cursor.next(); |
| | | put(entry); |
| | | return true; |
| | | // to remove and add again the cursor after moving it forward. |
| | | if (advanceNonExhaustedCursors) |
| | | { |
| | | Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); |
| | | if (firstEntry != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); |
| | | cursor.next(); |
| | | put(firstEntry); |
| | | } |
| | | } |
| | | // no cursors are left with changes. |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentRecord; |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | | return entry.getKey().getRecord(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Data getData() |
| | | { |
| | | return currentData; |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | | return entry.getValue(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " currentRecord=" + currentRecord |
| | | + " currentData=" + currentData + " openCursors=" + cursors |
| | | return getClass().getSimpleName() + " openCursors=" + cursors |
| | | + " exhaustedCursors=" + exhaustedCursors; |
| | | } |
| | | |