| | |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.readChangelogState()); |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = dbEnv.readChangelogState(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | for (CSN offlineCSN : domainState) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |