| | |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | | import org.opends.server.replication.server.changelog.je.DomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null); |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY); |
| | | |
| | | /** |
| | | * Creates a new changelog DB. |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | cursor.addDomain(baseDN, startState.getServerState(baseDN)); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |