| | |
| | | * {@link DBCursor}s, advancing from the oldest to the newest change cross all |
| | | * cursors. |
| | | * |
| | | * @param <Data> |
| | | * @param <T> |
| | | * The type of data associated with each cursor |
| | | * \@NotThreadSafe |
| | | */ |
| | | abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | | * might be recycled at some point when they start returning changes again. |
| | | */ |
| | | private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(); |
| | | private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, T>(); |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final TreeMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | private final TreeMap<DBCursor<UpdateMsg>, T> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, T>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | |
| | | // (which UpdateMsg has been consumed). |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove the first cursor, then add it again after moving it forward. |
| | | final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = |
| | | final Entry<DBCursor<UpdateMsg>, T> cursorToAdvance = |
| | | state != UNINITIALIZED ? cursors.pollFirstEntry() : null; |
| | | state = READY; |
| | | recycleExhaustedCursors(); |
| | |
| | | if (!exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle exhausted cursors in case the underlying replica DBs received new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | final Map<DBCursor<UpdateMsg>, T> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, T>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | for (Entry<DBCursor<UpdateMsg>, T> entry : copy.entrySet()) |
| | | { |
| | | addCursor(entry.getKey(), entry.getValue()); |
| | | } |
| | |
| | | * @param dataToFind |
| | | * the data for which the cursor must be found and removed |
| | | */ |
| | | protected void removeCursor(final Data dataToFind) |
| | | protected void removeCursor(final T dataToFind) |
| | | { |
| | | removeCursor(this.cursors, dataToFind); |
| | | removeCursor(this.exhaustedCursors, dataToFind); |
| | | } |
| | | |
| | | private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind) |
| | | private void removeCursor(Map<DBCursor<UpdateMsg>, T> cursors, T dataToFind) |
| | | { |
| | | for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter = |
| | | for (Iterator<Entry<DBCursor<UpdateMsg>, T>> cursorIter = |
| | | cursors.entrySet().iterator(); cursorIter.hasNext();) |
| | | { |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next(); |
| | | final Entry<DBCursor<UpdateMsg>, T> entry = cursorIter.next(); |
| | | if (dataToFind.equals(entry.getValue())) |
| | | { |
| | | entry.getKey().close(); |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException |
| | | protected void addCursor(final DBCursor<UpdateMsg> cursor, final T data) throws ChangelogException |
| | | { |
| | | if (cursor.next()) |
| | | { |
| | |
| | | { |
| | | // Cannot call incorporateNewCursors() here because |
| | | // somebody might have already called DBCursor.getRecord() and read the record |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | final Entry<DBCursor<UpdateMsg>, T> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | | return entry.getKey().getRecord(); |
| | |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | | */ |
| | | public Data getData() |
| | | public T getData() |
| | | { |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | final Entry<DBCursor<UpdateMsg>, T> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | | return entry.getValue(); |
| | |
| | | * cursor. In each pair, the data or the update message may be |
| | | * {@code null}, but at least one of them is non-null. |
| | | */ |
| | | public List<Pair<Data, UpdateMsg>> getSnapshot() |
| | | public List<Pair<T, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<Data, UpdateMsg>> snapshot = new ArrayList<Pair<Data, UpdateMsg>>(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>(); |
| | | for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet()) |
| | | { |
| | | final UpdateMsg updateMsg = entry.getKey().getRecord(); |
| | | final Data data = entry.getValue(); |
| | | final T data = entry.getValue(); |
| | | if (updateMsg != null || data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, updateMsg)); |
| | | } |
| | | } |
| | | for (Data data : exhaustedCursors.values()) |
| | | for (T data : exhaustedCursors.values()) |
| | | { |
| | | if (data != null) |
| | | { |