| | |
| | | * @param <Data> |
| | | * The type of data associated with each cursor |
| | | */ |
| | | public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** Whether this composite should try to recycle exhausted cursors. */ |
| | | private final boolean recycleExhaustedCursors; |
| | | /** |
| | | * These cursors are considered exhausted because they had no new changes the |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | | * <p> |
| | | * New cursors for this Map must be created from the same thread that will |
| | | * make use of them. When this rule is not obeyed, a JE exception will be |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | private final TreeMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a CompositeDBCursor using the provided collection of cursors. |
| | | * |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | * @param recycleExhaustedCursors |
| | | * whether a call to {@link #next()} tries to recycle exhausted |
| | | * cursors |
| | | */ |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, |
| | | boolean recycleExhaustedCursors) |
| | | { |
| | | this.recycleExhaustedCursors = recycleExhaustedCursors; |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | |
| | | { |
| | | return false; |
| | | } |
| | | final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; |
| | | state = READY; |
| | | if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) |
| | | |
| | | if (state == UNINITIALIZED) |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. |
| | | state = READY; |
| | | } |
| | | else |
| | | { |
| | | // Previous state was READY => we must advance the first cursor |
| | | // because the UpdateMsg it is pointing has already been consumed. |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove the first cursor, then add it again after moving it forward. |
| | | final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry(); |
| | | if (cursorToAdvance != null) |
| | | { |
| | | addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue()); |
| | | } |
| | | } |
| | | |
| | | recycleExhaustedCursors(); |
| | | removeNoLongerNeededCursors(); |
| | | incorporateNewCursors(); |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | if (!exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle exhausted cursors in case the underlying replica DBs received new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | { |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); |
| | | if (firstEntry != null && copy.containsKey(firstEntry.getKey())) |
| | | { |
| | | // if the first cursor was previously an exhausted cursor, |
| | | // then we have already called next() on it. |
| | | // Avoid calling it again because we know new changes have been found. |
| | | return true; |
| | | addCursor(entry.getKey(), entry.getValue()); |
| | | } |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and add again the cursor after moving it forward. |
| | | if (advanceNonExhaustedCursors) |
| | | { |
| | | Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); |
| | | if (firstEntry != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); |
| | | cursor.next(); |
| | | put(firstEntry); |
| | | } |
| | | } |
| | | // no cursors are left with changes. |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | | private void removeNoLongerNeededCursors() |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | final Data data = entry.getValue(); |
| | | if (cursor.getRecord() != null) |
| | | for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator = |
| | | cursors.entrySet().iterator(); iterator.hasNext();) |
| | | { |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next(); |
| | | final Data data = entry.getValue(); |
| | | if (isCursorNoLongerNeededFor(data)) |
| | | { |
| | | entry.getKey().close(); |
| | | iterator.remove(); |
| | | cursorRemoved(data); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Adds a cursor to this composite cursor. It first calls |
| | | * {@link DBCursor#next()} to verify whether it is exhausted or not. |
| | | * |
| | | * @param cursor |
| | | * the cursor to add to this composite |
| | | * @param data |
| | | * the data associated to the provided cursor |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException |
| | | { |
| | | if (cursor.next()) |
| | | { |
| | | this.cursors.put(cursor, data); |
| | | } |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | // Cannot call incorporateNewCursors() here because |
| | | // somebody might have already called DBCursor.getRecord() and read the record |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Called when implementors should incorporate new cursors into the current |
| | | * composite DBCursor. Implementors should call |
| | | * {@link #addCursor(DBCursor, Object)} to do so. |
| | | * |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected abstract void incorporateNewCursors() throws ChangelogException; |
| | | |
| | | /** |
| | | * Returns whether the cursor associated to the provided data should be removed. |
| | | * |
| | | * @param data the data associated to the cursor to be tested |
| | | * @return true if the cursor associated to the provided data should be removed, |
| | | * false otherwise |
| | | */ |
| | | protected abstract boolean isCursorNoLongerNeededFor(Data data); |
| | | |
| | | /** |
| | | * Notifies that the cursor associated to the provided data has been removed. |
| | | * |
| | | * @param data |
| | | * the data associated to the removed cursor |
| | | */ |
| | | protected abstract void cursorRemoved(Data data); |
| | | |
| | | /** |
| | | * Returns the data associated to the cursor that returned the current record. |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | |
| | | @Override |
| | | public void close() |
| | | { |
| | | state = CLOSED; |
| | | StaticUtils.close(cursors.keySet()); |
| | | StaticUtils.close(exhaustedCursors.keySet()); |
| | | cursors.clear(); |
| | | exhaustedCursors.clear(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |