| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); |
| | | private final ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>>(); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | | synchronized (replicaCursors) |
| | | { |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaID); |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaId); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<ReplicaCursor>(); |
| | | replicaCursors.put(replicaID, cursors); |
| | | replicaCursors.put(replicaId, cursors); |
| | | } |
| | | cursors.add(replicaCursor); |
| | | } |
| | |
| | | else if (cursor instanceof ReplicaCursor) |
| | | { |
| | | final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | |
| | | |
| | | private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (ReplicaCursor cursor : cursors) |