| | |
| | | { |
| | | |
| | | /** |
| | | * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a |
| | | * replication domain, advancing from the oldest to the newest change cross |
| | | * {@link DBCursor} implementation that iterates across all the ReplicaDBs of |
| | | * a replication domain, advancing from the oldest to the newest change cross |
| | | * all replicaDBs. |
| | | */ |
| | | private final class CrossReplicaDBCursor implements ReplicaDBCursor |
| | | private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private UpdateMsg currentChange; |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all replicaDBs. |
| | | */ |
| | | private final NavigableSet<ReplicaDBCursor> cursors = |
| | | new TreeSet<ReplicaDBCursor>(); |
| | | private final DN baseDN; |
| | | private final NavigableSet<DBCursor<UpdateMsg>> cursors = |
| | | new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | |
| | | @Override |
| | | public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | }); |
| | | |
| | | public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, |
| | | private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | |
| | | { |
| | | try |
| | | { |
| | | ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | } |
| | |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final ReplicaDBCursor cursor = cursors.pollFirst(); |
| | | currentChange = cursor.getChange(); |
| | | final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); |
| | | currentChange = cursor.getRecord(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(cursor); |
| | | return true; |
| | | } |
| | | |
| | | void addCursorIfNotEmpty(ReplicaDBCursor cursor) |
| | | void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor) |
| | | { |
| | | if (cursor.getChange() != null) |
| | | if (cursor.getRecord() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public UpdateMsg getChange() |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentChange; |
| | | } |
| | |
| | | StaticUtils.close(cursors); |
| | | } |
| | | |
| | | @Override |
| | | public int compareTo(ReplicaDBCursor o) |
| | | { |
| | | final CSN csn1 = getChange().getCSN(); |
| | | final CSN csn2 = o.getChange().getCSN(); |
| | | |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | |
| | | private static final ReplicaDBCursor EMPTY_CURSOR = new ReplicaDBCursor() |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR = |
| | | new DBCursor<UpdateMsg>() |
| | | { |
| | | |
| | | @Override |
| | | public int compareTo(ReplicaDBCursor o) |
| | | { |
| | | if (o == null) |
| | | { |
| | | throw new NullPointerException(); // as per javadoc |
| | | } |
| | | return o == this ? 0 : -1; // equal to self, but less than all the rest |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | | public UpdateMsg getChange() |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return null; |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "EmptyReplicaDBCursor"; |
| | | return "EmptyDBCursor<UpdateMsg>"; |
| | | } |
| | | }; |
| | | |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN) |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN) |
| | | { |
| | | // Builds a new serverState for all the serverIds in the replication domain |
| | | // to ensure we get cursors starting after the provided CSN. |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) |
| | | { |
| | | return new CrossReplicaDBCursor(baseDN, startAfterServerState); |