| | |
| | | protected static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * {@link DBCursor} implementation that iterates across all the ReplicaDBs of |
| | | * a replication domain, advancing from the oldest to the newest change cross |
| | | * all replicaDBs. |
| | | */ |
| | | private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private UpdateMsg currentChange; |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all replicaDBs. |
| | | */ |
| | | private final NavigableSet<DBCursor<UpdateMsg>> cursors = |
| | | new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | |
| | | @Override |
| | | public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | }); |
| | | |
| | | public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | { |
| | | this.baseDN = baseDN; |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState.getCSN(serverId); |
| | | addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN)); |
| | | } |
| | | } |
| | | |
| | | private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | } |
| | | return EMPTY_CURSOR; |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (cursors.isEmpty()) |
| | | { |
| | | currentChange = null; |
| | | return false; |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); |
| | | currentChange = cursor.getRecord(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(cursor); |
| | | return true; |
| | | } |
| | | |
| | | void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor) |
| | | { |
| | | if (cursor.getRecord() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | | else |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentChange; |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | StaticUtils.close(cursors); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " baseDN=" + baseDN |
| | | + " currentChange=" + currentChange + " open cursors=" + cursors; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | * <p> |
| | | * When removing a domainMap, code: |
| | |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) throws ChangelogException |
| | | { |
| | | return new CrossReplicaDBCursor(baseDN, startAfterServerState); |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final List<DBCursor<UpdateMsg>> cursors = |
| | | new ArrayList<DBCursor<UpdateMsg>>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState.getCSN(serverId); |
| | | cursors.add(getCursorFrom(baseDN, serverId, lastCSN)); |
| | | } |
| | | return new CompositeDBCursor(cursors); |
| | | } |
| | | |
| | | private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | } |
| | | return EMPTY_CURSOR; |
| | | } |
| | | |
| | | private ServerState buildServerState(DN baseDN, CSN startAfterCSN) |