| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY" |
| | | + " is not supported for the JE implementation fo changelog"); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startCSN); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | | |
| | | synchronized (replicaCursors) |
| | | { |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaID); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<ReplicaCursor>(); |
| | | replicaCursors.put(replicaID, cursors); |
| | | } |
| | | cursors.add(replicaCursor); |
| | | } |
| | | |
| | | return replicaCursor; |
| | | } |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | |
| | | } |
| | | } |
| | | } |
| | | else if (cursor instanceof ReplicaCursor) |
| | | { |
| | | final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | { |
| | | indexer.replicaOffline(baseDN, offlineCSN); |
| | | } |
| | | updateCursorsWithOfflineCSN(baseDN, offlineCSN); |
| | | } |
| | | |
| | | private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN)); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (ReplicaCursor cursor : cursors) |
| | | { |
| | | cursor.setOfflineCSN(offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |