mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.04.2014 37fd83b785c0993b0e1f0fb93777943a9ec46d83
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -38,6 +38,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -310,7 +311,7 @@
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.readChangelogState();
      final ChangelogState changelogState = dbEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
@@ -333,7 +334,7 @@
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
    }
    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      for (int serverId : entry.getValue())
      {
@@ -634,7 +635,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(dbEnv.readChangelogState());
      startIndexer(dbEnv.getChangelogState());
    }
    else
    {
@@ -697,14 +698,14 @@
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final ChangelogState state = dbEnv.readChangelogState();
    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
    // recycle exhausted cursors,
@@ -712,13 +713,13 @@
    return new CompositeDBCursor<Void>(cursors, true);
  }
  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  {
    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
    if (domain != null)
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    {
      for (CSN offlineCSN : domain)
      for (CSN offlineCSN : domainState)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))