| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = dbEnv.readChangelogState(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |