mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.30.2013 0a51f5fbeb5e99c52f1be8973ae656de34fab75f
OPENDJ-1116 Introduce abstraction for the changelog DB

Removed avoidable references to replicationServerDomain.getReplicationServer().
Code cleanups:
- collapsed if statements
- applied early exits
- Replaced useless use of StringBulders with string concatenations
- Removed useless parentheses


*ServerHandler.java:
Replaced calls to this.replicationServerDomain.getReplicationServer() with directly using this.replicationServer .

DataServerHandler.java:
Inlined local variables.
Removed useless catch.

ECLServerHandler.java:
Extracted method releaseIterator().

LightweightServerHandler.java:
Extracted method getLocalRSMonitorInstanceName().

ReplicationServerHandler.java
In isRemoteLDAPServer(), renamed parameter.


MessageHandler.java:
Extracted methods collectAllIteratorsWithChanges(), addIteratorIfNotEmpty(), releaseAllIterators().

ReplicationServerDomain.java
Renamed replicationServer to localReplicationServer.
In put(), renamed id to serverId + extracted method publishMessage().
In getChangelogIterator(), renamed parameter.
Extracted methods replyWithMonitorMsg(), replyWithUnroutableMsgType(), forwardMsgToAllServers(), replyWithUnreachablePeerMsg() from process().
Renamed stopDbHandlers() to shutdownDbHandlers().
Extracted isServerConnected() from getEligibleCN().



DbHandler.java, JEReplicationIterator.java:
Renamed parameter.
8 files modified
1112 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 155 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 58 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java 43 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 143 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 536 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 151 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java 13 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -27,35 +27,21 @@
 */
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
/**
 * This class defines a server handler, which handles all interaction with a
 * peer server (RS or DS).
@@ -104,11 +90,11 @@
   * @param newGenId The new generation id to take into account
   * @throws IOException If IO error occurred.
   */
  public void changeStatusForResetGenId(long newGenId)
  throws IOException
  public void changeStatusForResetGenId(long newGenId) throws IOException
  {
    StatusMachineEvent event;
    final int localRsServerId = replicationServer.getServerId();
    StatusMachineEvent event;
    if (newGenId == -1)
    {
      // The generation id is being made invalid, let's put the DS
@@ -127,9 +113,8 @@
          if (debugEnabled())
          {
            TRACER.debugInfo(
                "In RS " +
                replicationServerDomain.getReplicationServer().getServerId() +
                ". Closing connection to DS " + getServerId() +
                "In RS " + localRsServerId +
                ", closing connection to DS " + getServerId() +
                " for baseDn " + getBaseDN() +
                " to force reconnection as new local" +
                " generationId and remote one match and DS is in bad gen id: " +
@@ -140,20 +125,19 @@
          // would rewait the RSD lock that we already must have entering this
          // method. This would lead to a reentrant lock which we do not want.
          // So simply close the session, this will make the hang up appear
          // after the reader thread that took the RSD lock realeases it.
          if (session != null)
          // after the reader thread that took the RSD lock releases it.
          if (session != null
              // V4 protocol introduced a StopMsg to properly close the
              // connection between servers
             && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // V4 protocol introduces a StopMsg to properly close the
            // connection between servers
            if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
            try
            {
              try
              {
                session.publish(new StopMsg());
              } catch (IOException ioe)
              {
                // Anyway, going to close session, so nothing to do
              }
              session.publish(new StopMsg());
            }
            catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
@@ -165,12 +149,10 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo(
                "In RS " +
                replicationServerDomain.getReplicationServer().getServerId() +
                ". DS " + getServerId() + " for baseDn " + getBaseDN() +
                " has already generation id " + newGenId +
            " so no ChangeStatusMsg sent to him.");
            TRACER.debugInfo("In RS " + localRsServerId + ". DS "
                + getServerId() + " for baseDn " + getBaseDN()
                + " has already generation id " + newGenId
                + " so no ChangeStatusMsg sent to him.");
          }
          return;
        }
@@ -182,14 +164,13 @@
      }
    }
    if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
        (status == ServerStatus.FULL_UPDATE_STATUS))
    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
        && status == ServerStatus.FULL_UPDATE_STATUS)
    {
      // Prevent useless error message (full update status cannot lead to bad
      // gen status)
      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
          Integer.toString(replicationServerDomain.
              getReplicationServer().getServerId()),
              Integer.toString(localRsServerId),
              getBaseDN(),
              Integer.toString(serverId),
              Long.toString(generationId),
@@ -214,11 +195,9 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " +
          replicationServerDomain.getReplicationServer().getServerId() +
          " Sending change status for reset gen id to " + getServerId() +
          " for baseDn " + getBaseDN() + ":\n" + csMsg);
      TRACER.debugInfo("In RS " + localRsServerId
          + " Sending change status for reset gen id to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
    }
    session.publish(csMsg);
@@ -257,11 +236,9 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " +
          replicationServerDomain.getReplicationServer().getServerId() +
          " Sending change status from status analyzer to " + getServerId() +
          " for baseDn " + getBaseDN() + ":\n" + csMsg);
      TRACER.debugInfo("In RS " + replicationServer.getServerId()
          + " Sending change status from status analyzer to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
    }
    session.publish(csMsg);
@@ -296,14 +273,13 @@
    // Add the specific DS ones
    attributes.add(Attributes.create("replica", serverURL));
    attributes.add(Attributes.create("connected-to",
        this.replicationServerDomain.getReplicationServer()
        .getMonitorInstanceName()));
        this.replicationServer.getMonitorInstanceName()));
    MonitorData md = replicationServerDomain.getDomainMonitorData();
    // Oldest missing update
    Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
    if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
    long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
    if (approxFirstMissingDate > 0)
    {
      Date date = new Date(approxFirstMissingDate);
      attributes.add(Attributes.create(
@@ -314,14 +290,12 @@
    }
    // Missing changes
    long missingChanges = md.getMissingChanges(serverId);
    attributes.add(Attributes.create("missing-changes", String
        .valueOf(missingChanges)));
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(md.getMissingChanges(serverId))));
    // Replication delay
    long delay = md.getApproxDelay(serverId);
    attributes.add(Attributes.create("approximate-delay", String
        .valueOf(delay)));
    attributes.add(Attributes.create("approximate-delay",
        String.valueOf(md.getApproxDelay(serverId))));
    /* get the Server State */
    AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -541,18 +515,9 @@
      {
        Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
          Integer.toString(inServerStartMsg.getServerId()),
          Integer.toString(replicationServerDomain.getReplicationServer().
          getServerId()));
          Integer.toString(replicationServer.getServerId()));
        throw new DirectoryException(ResultCode.OTHER, errMessage);
      }
      catch (NotSupportedOldVersionPDUException e)
      {
        // We do not need to support DS V1 connection, we just accept RS V1
        // connection:
        // We just trash the message, log the event for debug purpose and close
        // the connection
        throw new DirectoryException(ResultCode.OTHER, null, null);
      }
      catch (Exception e)
      {
        // We do not need to support DS V1 connection, we just accept RS V1
@@ -588,7 +553,7 @@
    }
    finally
    {
      if ((replicationServerDomain != null) &&
      if (replicationServerDomain != null &&
          replicationServerDomain.hasLock())
        replicationServerDomain.release();
    }
@@ -610,21 +575,20 @@
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      startMsg = new ReplServerStartMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold());
          replicationServer.getDegradedStatusThreshold());
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold(), replicationServer.getWeight(),
          replicationServer.getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
    }
@@ -651,17 +615,10 @@
  {
    if (serverId != 0)
    {
      StringBuilder builder = new StringBuilder("Replica DS(");
      builder.append(serverId);
      builder.append(") for domain \"");
      builder.append(replicationServerDomain.getBaseDn());
      builder.append("\"");
      return builder.toString();
      return "Replica DS(" + serverId + ") for domain \""
          + replicationServerDomain.getBaseDn() + "\"";
    }
    else
    {
      return "Unknown server";
    }
    return "Unknown server";
  }
  /**
@@ -740,7 +697,7 @@
    else
    {
      // We are an empty ReplicationServer
      if ((generationId > 0) && (!getServerState().isEmpty()))
      if (generationId > 0 && !getServerState().isEmpty())
      {
        // If the LDAP server has already sent changes
        // it is not expected to connect to an empty RS
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,20 +365,19 @@
    if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      startMsg = new ReplServerStartMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold());
       startMsg = new ReplServerStartMsg(getReplicationServerId(),
           getReplicationServerURL(), getBaseDN(), maxRcvWindow,
           replicationServerDomain.getDbServerState(),
           localGenerationId, sslEncryption, getLocalGroupId(),
           replicationServer.getDegradedStatusThreshold());
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          new ServerState(), localGenerationId, sslEncryption,
          getLocalGroupId(), 0, replicationServer.getWeight(), 0);
       startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
           getReplicationServerURL(), getBaseDN(), maxRcvWindow,
           new ServerState(), localGenerationId, sslEncryption,
           getLocalGroupId(), 0, replicationServer.getWeight(), 0);
    }
    send(startMsg);
@@ -556,15 +555,13 @@
    catch(DirectoryException de)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, de);
      if (draftCNDbIter != null)
        draftCNDbIter.releaseCursor();
      releaseIterator();
      throw de;
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      if (draftCNDbIter != null)
        draftCNDbIter.releaseCursor();
      releaseIterator();
      throw new DirectoryException(
          ResultCode.OPERATIONS_ERROR,
          Message.raw(Category.SYNC,
@@ -917,11 +914,7 @@
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " shutdown()" + draftCNDbIter);
    if (this.draftCNDbIter != null)
    {
      draftCNDbIter.releaseCursor();
      draftCNDbIter = null;
    }
    releaseIterator();
    for (DomainContext domainCtxt : domainCtxts) {
      if (!domainCtxt.unRegisterHandler()) {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -934,6 +927,15 @@
    domainCtxts = null;
  }
  private void releaseIterator()
  {
    if (this.draftCNDbIter != null)
    {
      this.draftCNDbIter.releaseCursor();
      this.draftCNDbIter = null;
    }
  }
  /**
   * Request to shutdown the associated writer.
   */
@@ -1112,7 +1114,7 @@
        {
          session.publish(
            new ErrorMsg(
             replicationServerDomain.getReplicationServer().getServerId(),
             replicationServer.getServerId(),
             serverId,
             Message.raw(Category.SYNC, Severity.INFORMATION,
                 "Exception raised: " + e.getMessage())));
@@ -1130,11 +1132,9 @@
    registerIntoDomain();
    if (debugEnabled())
      TRACER.debugInfo(
          this.getClass().getCanonicalName()+ " " + operationId +
          " initialized: " +
          " " + dumpState() + " " +
          " " + clDomCtxtsToString(""));
      TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
          + " initialized: " + " " + dumpState() + " " + " "
          + clDomCtxtsToString(""));
  }
  private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1522,12 +1522,8 @@
      searchPhase = UNDEFINED_PHASE;
    }
    if (draftCNDbIter!=null)
    {
      // End of INIT_PHASE => always release the iterator
      draftCNDbIter.releaseCursor();
      draftCNDbIter = null;
    }
    // End of INIT_PHASE => always release the iterator
    releaseIterator();
  }
  /**
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -27,14 +27,7 @@
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
@@ -50,6 +43,8 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * This class defines a server handler dedicated to the remote LDAP servers
 * connected to a remote Replication Server.
@@ -142,13 +137,15 @@
    this.protocolVersion = protocolVersion;
    if (debugEnabled())
      TRACER.debugInfo(
        "In " +
  replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
        " LWSH for remote server " + this.serverId +
        " connected to:" + this.replServerHandler.getMonitorInstanceName() +
        " ()");
}
      TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
          + " LWSH for remote server " + this.serverId + " connected to:"
          + this.replServerHandler.getMonitorInstanceName() + " ()");
  }
  private String getLocalRSMonitorInstanceName()
  {
    return rsDomain.getReplicationServer().getMonitorInstanceName();
  }
  /**
   * Creates a DSInfo structure representing this remote DS.
@@ -176,15 +173,11 @@
  public void startHandler()
  {
    if (debugEnabled())
      TRACER.debugInfo(
      "In " +
replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
      " LWSH for remote server " + this.serverId +
      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
          " start");
      TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
          + " LWSH for remote server " + this.serverId + " connected to:"
          + this.replServerHandler.getMonitorInstanceName() + " start");
    DirectoryServer.deregisterMonitorProvider(this);
    DirectoryServer.registerMonitorProvider(this);
  }
  /**
@@ -193,10 +186,8 @@
  public void stopHandler()
  {
    if (debugEnabled())
      TRACER.debugInfo("In "
          + replServerHandler.getDomain().getReplicationServer()
              .getMonitorInstanceName() + " LWSH for remote server "
          + this.serverId + " connected to:"
      TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
          + " LWSH for remote server " + this.serverId + " connected to:"
          + this.replServerHandler.getMonitorInstanceName() + " stop");
    DirectoryServer.deregisterMonitorProvider(this);
  }
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@
           *           unlock memory tree
           *           restart as usual
           *   load this change on the delayList
           *
           */
          SortedSet<ReplicationIterator> iteratorSortedSet =
              new TreeSet<ReplicationIterator>(
                  new ReplicationIteratorComparator());
          SortedSet<ReplicationIterator> iteratorSortedSet = null;
          try
          {
            /* fill the lateQueue */
            for (int serverId : replicationServerDomain.getServers())
            {
              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
              ReplicationIterator iterator = replicationServerDomain
                  .getChangelogIterator(serverId, lastCsn);
              if (iterator != null)
              {
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            iteratorSortedSet = collectAllIteratorsWithChanges();
            /* fill the lateQueue */
            // The loop below relies on the fact that it is sorted based
            // on the currentChange of each iterator to consider the next
            // change across all servers.
@@ -320,22 +301,12 @@
              ReplicationIterator iterator = iteratorSortedSet.first();
              iteratorSortedSet.remove(iterator);
              lateQueue.add(iterator.getChange());
              if (iterator.next())
              {
                iteratorSortedSet.add(iterator);
              }
              else
              {
                iterator.releaseCursor();
              }
              addIteratorIfNotEmpty(iteratorSortedSet, iterator);
            }
          }
          finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
            releaseAllIterators(iteratorSortedSet);
          }
          /*
@@ -343,7 +314,6 @@
           * messages in the replication log so the remote serevr is not
           * late anymore.
           */
          if (lateQueue.isEmpty())
          {
            synchronized (msgQueue)
@@ -430,6 +400,19 @@
    return null;
  }
  private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
      ReplicationIterator iter)
  {
    if (iter.next())
    {
      iterators.add(iter);
    }
    else
    {
      iter.releaseCursor();
    }
  }
  /**
   * Get the older Change Number for that server.
   * Returns null when the queue is empty.
@@ -450,7 +433,12 @@
      }
      else
      {
        if (lateQueue.isEmpty())
        if (!lateQueue.isEmpty())
        {
          UpdateMsg msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
        else
        {
          /*
          following is false AND lateQueue is empty
@@ -460,36 +448,10 @@
          there. So let's take the last change not sent directly from
          the db.
          */
          SortedSet<ReplicationIterator> iteratorSortedSet =
              new TreeSet<ReplicationIterator>(
                  new ReplicationIteratorComparator());
          SortedSet<ReplicationIterator> iteratorSortedSet = null;
          try
          {
            // Build a list of candidates iterator (i.e. db i.e. server)
            for (int serverId : replicationServerDomain.getServers())
            {
              // get the last already sent CN from that server
              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
              // get an iterator in this server db from that last change
              ReplicationIterator iterator =
                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
              /*
              if that iterator has changes, then it is a candidate
              it is added in the sorted list at a position given by its
              current change (see ReplicationIteratorComparator).
              */
              if (iterator != null)
              {
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            iteratorSortedSet = collectAllIteratorsWithChanges();
            UpdateMsg msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          } catch (Exception e)
@@ -497,21 +459,58 @@
            result = null;
          } finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
            releaseAllIterators(iteratorSortedSet);
          }
        } else
        {
          UpdateMsg msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
      }
    }
    return result;
  }
  private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
  {
    SortedSet<ReplicationIterator> results =
        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
    // Build a list of candidates iterator (i.e. db i.e. server)
    for (int serverId : replicationServerDomain.getServers())
    {
      // get the last already sent CN from that server
      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
      // get an iterator in this server db from that last change
      ReplicationIterator iter =
        replicationServerDomain.getChangelogIterator(serverId, lastCsn);
      /*
      if that iterator has changes, then it is a candidate
      it is added in the sorted list at a position given by its
      current change (see ReplicationIteratorComparator).
      */
      if (iter != null)
      {
        if (iter.getChange() != null)
        {
          results.add(iter);
        }
        else
        {
          iter.releaseCursor();
        }
      }
    }
    return results;
  }
  private void releaseAllIterators(SortedSet<ReplicationIterator> iterators)
  {
    if (iterators != null)
    {
      for (ReplicationIterator iter : iterators)
      {
        iter.releaseCursor();
      }
    }
  }
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -118,7 +118,8 @@
   */
  private final Map<Integer, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Integer, DbHandler>();
  private ReplicationServer replicationServer;
  /** The ReplicationServer that created the current instance. */
  private ReplicationServer localReplicationServer;
  /** GenerationId management. */
  private volatile long generationId = -1;
@@ -217,16 +218,16 @@
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ReplicationServerDomain.
   * @param replicationServer the ReplicationServer that created this
   * @param localReplicationServer the ReplicationServer that created this
   *                          replicationServer cache.
   */
  public ReplicationServerDomain(
      String baseDn, ReplicationServer replicationServer)
  public ReplicationServerDomain(String baseDn,
      ReplicationServer localReplicationServer)
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
    this.localReplicationServer = localReplicationServer;
    this.assuredTimeoutTimer = new Timer("Replication server RS("
        + replicationServer.getServerId()
        + localReplicationServer.getServerId()
        + ") assured timer for domain \"" + baseDn + "\"", true);
    DirectoryServer.registerMonitorProvider(this);
@@ -245,9 +246,9 @@
  public void put(UpdateMsg update, ServerHandler sourceHandler)
    throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    int id = cn.getServerId();
    int serverId = cn.getServerId();
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
@@ -297,7 +298,7 @@
        {
          // Unknown assured mode: should never happen
          Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
            Integer.toString(replicationServer.getServerId()),
            Integer.toString(localReplicationServer.getServerId()),
            assuredMode.toString(), baseDn, update.toString());
          logError(errorMsg);
          assuredMessage = false;
@@ -308,40 +309,11 @@
      }
    }
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler;
    synchronized (sourceDbHandlers)
    if (!publishMessage(update, serverId))
    {
      dbHandler = sourceDbHandlers.get(id);
      if (dbHandler == null)
      {
        try
        {
          dbHandler = replicationServer.newDbHandler(id, baseDn);
          generationIdSavedStatus = true;
        } catch (ChangelogException e)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          return;
        }
        sourceDbHandlers.put(id, dbHandler);
      }
      return;
    }
    // Publish the messages to the source handler
    dbHandler.add(update);
    List<Integer> expectedServers = null;
    if (assuredMessage)
    {
@@ -363,7 +335,7 @@
        // times out)
        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
        assuredTimeoutTimer.schedule(assuredTimeoutTask,
          replicationServer.getAssuredTimeout());
            localReplicationServer.getAssuredTimeout());
        // Purge timer every 100 treated messages
        assuredTimeoutTimerPurgeCounter++;
        if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
@@ -408,8 +380,9 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("In Replication Server "
                + replicationServer.getReplicationPort() + " " + baseDn + " "
                + replicationServer.getServerId() + " for dn " + baseDn
                + localReplicationServer.getReplicationPort() + " " + baseDn
                + " "
                + localReplicationServer.getServerId() + " for dn " + baseDn
                + ", update " + update.getChangeNumber()
                + " will not be sent to replication server "
                + handler.getServerId() + " with generation id "
@@ -464,7 +437,7 @@
          }
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
          {
            TRACER.debugInfo("In RS " + replicationServer.getServerId()
            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                + " for dn " + baseDn + ", update " + update.getChangeNumber()
                + " will not be sent to directory server "
                + handler.getServerId() + " as it is in full update");
@@ -484,6 +457,44 @@
    }
  }
  private boolean publishMessage(UpdateMsg update, int serverId)
  {
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler;
    synchronized (sourceDbHandlers)
    {
      dbHandler = sourceDbHandlers.get(serverId);
      if (dbHandler == null)
      {
        try
        {
          dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
          generationIdSavedStatus = true;
        } catch (ChangelogException e)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          localReplicationServer.shutdown();
          return false;
        }
        sourceDbHandlers.put(serverId, dbHandler);
      }
    }
    // Publish the messages to the source handler
    dbHandler.add(update);
    return true;
  }
  private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
      UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
      boolean assuredMessage, List<Integer> expectedServers)
@@ -557,7 +568,7 @@
    UpdateMsg update, ServerHandler sourceHandler) throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    byte groupId = replicationServer.getGroupId();
    byte groupId = localReplicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
    List<Integer> expectedServers = new ArrayList<Integer>();
    List<Integer> wrongStatusServers = new ArrayList<Integer>();
@@ -642,13 +653,13 @@
    ChangeNumber cn = update.getChangeNumber();
    boolean interestedInAcks = false;
    byte safeDataLevel = update.getSafeDataLevel();
    byte groupId = replicationServer.getGroupId();
    byte groupId = localReplicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
    if (safeDataLevel < (byte) 1)
    {
      // Should never happen
      Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
        Integer.toString(replicationServer.getServerId()),
        Integer.toString(localReplicationServer.getServerId()),
        Byte.toString(safeDataLevel), baseDn, update.toString());
      logError(errorMsg);
    } else if (sourceGroupId == groupId
@@ -799,7 +810,7 @@
             */
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Integer.toString(replicationServer.getServerId()),
              Integer.toString(localReplicationServer.getServerId()),
              Integer.toString(origServer.getServerId()),
              cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
@@ -862,7 +873,7 @@
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
          {
            TRACER.debugInfo("In RS " + replicationServer.getServerId()
            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                    + " for "+ baseDn
                    + ", sending timeout for assured update with change "
                    + " number " + cn + " to server id "
@@ -879,7 +890,7 @@
             */
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
                Integer.toString(replicationServer.getServerId()),
                Integer.toString(localReplicationServer.getServerId()),
                Integer.toString(origServer.getServerId()),
                cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
@@ -987,7 +998,7 @@
    {
      // looks like two connected LDAP servers have the same serverId
      Message message = ERR_DUPLICATE_SERVER_ID.get(
          replicationServer.getMonitorInstanceName(),
          localReplicationServer.getMonitorInstanceName(),
          directoryServers.get(handler.getServerId()).toString(),
          handler.toString(), handler.getServerId());
      logError(message);
@@ -1007,7 +1018,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the server handler "
          + handler.getMonitorInstanceName());
    }
@@ -1045,7 +1057,8 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
            TRACER.debugInfo("In "
                + localReplicationServer.getMonitorInstanceName()
                + " remote server " + handler.getMonitorInstanceName()
                + " is the last RS/DS to be stopped:"
                + " stopping monitoring publisher");
@@ -1078,7 +1091,7 @@
            if (debugEnabled())
            {
              TRACER.debugInfo("In "
                  + replicationServer.getMonitorInstanceName()
                  + localReplicationServer.getMonitorInstanceName()
                  + " remote server " + handler.getMonitorInstanceName()
                  + " is the last DS to be stopped: stopping status analyzer");
            }
@@ -1128,7 +1141,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the message handler "
          + handler.getMonitorInstanceName());
    }
@@ -1207,8 +1221,8 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS "
          + this.replicationServer.getMonitorInstanceName() + " for " + baseDn
          + " " + " mayResetGenerationId generationIdSavedStatus="
          + this.localReplicationServer.getMonitorInstanceName()
          + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus="
          + generationIdSavedStatus);
    }
@@ -1225,7 +1239,7 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("In RS "
                + this.replicationServer.getMonitorInstanceName() + " for "
                + this.localReplicationServer.getMonitorInstanceName() + " for "
                + baseDn + " " + " mayResetGenerationId skip RS"
                + rsh.getMonitorInstanceName() + " that has different genId");
          }
@@ -1236,7 +1250,7 @@
            if (debugEnabled())
            {
              TRACER.debugInfo("In RS "
                  + this.replicationServer.getMonitorInstanceName()
                  + this.localReplicationServer.getMonitorInstanceName()
                  + " for "+ baseDn + " mayResetGenerationId RS"
                  + rsh.getMonitorInstanceName()
                  + " has servers connected to it"
@@ -1252,7 +1266,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RS "
            + this.replicationServer.getMonitorInstanceName() + " for "
            + this.localReplicationServer.getMonitorInstanceName() + " for "
            + baseDn + " "
            + " has servers connected to it - will not reset generationId");
      }
@@ -1292,7 +1306,7 @@
        // looks like two replication servers have the same serverId
        // log an error message and drop this connection.
        Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
          replicationServer.getMonitorInstanceName(), oldHandler.
          localReplicationServer.getMonitorInstanceName(), oldHandler.
          getServerAddressURL(), handler.getServerAddressURL(),
          handler.getServerId());
        throw new DirectoryException(ResultCode.OTHER, message);
@@ -1372,12 +1386,12 @@
   * and locks used by the ReplicationIterator.
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @param startAfterCN Starting point for the iterator.
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getChangelogIterator(int serverId,
      ChangeNumber changeNumber)
      ChangeNumber startAfterCN)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
@@ -1388,7 +1402,7 @@
    ReplicationIterator it;
    try
    {
      it = handler.generateIterator(changeNumber);
      it = handler.generateIterator(startAfterCN);
    }
    catch (Exception e)
    {
@@ -1535,14 +1549,15 @@
  }
  /**
   * Processes a message coming from one server in the topology
   * and potentially forwards it to one or all other servers.
   * Processes a message coming from one server in the topology and potentially
   * forwards it to one or all other servers.
   *
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
   * the message.
   * @param msg
   *          The message received and to be processed.
   * @param msgEmitter
   *          The server handler of the server that emitted the message.
   */
  public void process(RoutableMsg msg, ServerHandler senderHandler)
  public void process(RoutableMsg msg, ServerHandler msgEmitter)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
@@ -1551,158 +1566,176 @@
        !(msg instanceof InitializeRcvAckMsg) &&
        !(msg instanceof EntryMsg) &&
        !(msg instanceof DoneMsg) &&
        (msg.getDestination() == this.replicationServer.getServerId()))
        (msg.getDestination() == this.localReplicationServer.getServerId()))
    {
      if (msg instanceof ErrorMsg)
      {
        ErrorMsg errorMsg = (ErrorMsg) msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(
          errorMsg.getDetails()));
        logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
      } else if (msg instanceof MonitorRequestMsg)
      {
        // If the request comes from a Directory Server we need to
        // build the full list of all servers in the topology
        // and send back a MonitorMsg with the full list of all the servers
        // in the topology.
        if (senderHandler.isDataServer())
        {
          // Monitoring information requested by a DS
          MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
              msg.getDestination(), msg.getSenderID(), monitorData);
          if (monitorMsg != null)
          {
            try
            {
              senderHandler.send(monitorMsg);
            }
            catch (IOException e)
            {
              // the connection was closed.
            }
          }
          return;
        } else
        {
          // Monitoring information requested by a RS
          MonitorMsg monitorMsg =
            createLocalTopologyMonitorMsg(msg.getDestination(),
            msg.getSenderID());
          if (monitorMsg != null)
          {
            try
            {
              senderHandler.send(monitorMsg);
            } catch (Exception e)
            {
              // We log the error. The requestor will detect a timeout or
              // any other failure on the connection.
              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                  Integer.toString(msg.getDestination())));
            }
          }
        }
        replyWithMonitorMsg(msg, msgEmitter);
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId());
        receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
      } else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
          msg.getClass().getCanonicalName()));
        MessageBuilder mb1 = new MessageBuilder();
        mb1.append(
            NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName()));
        mb1.append("serverID:").append(msg.getDestination());
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb1.toMessage());
        try
        {
          senderHandler.send(errMsg);
        } catch (IOException ioe1)
        {
          // an error happened on the sender session trying to recover
          // from an error on the receiver session.
          // Not much more we can do at this point.
        }
        replyWithUnroutableMsgType(msgEmitter, msg);
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
    if (servers.isEmpty())
    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
    if (!servers.isEmpty())
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
          this.baseDn, Integer.toString(msg.getDestination())));
      mb.append(" In Replication Server=").append(
        this.replicationServer.getMonitorInstanceName());
      mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
      mb.append(" Details:routing table is empty");
      ErrorMsg errMsg = new ErrorMsg(
        this.replicationServer.getServerId(),
        msg.getSenderID(),
        mb.toMessage());
      logError(mb.toMessage());
      forwardMsgToAllServers(msg, servers, msgEmitter);
    }
    else
    {
      replyWithUnreachablePeerMsg(msgEmitter, msg);
    }
  }
  private void replyWithMonitorMsg(RoutableMsg msg, ServerHandler msgEmitter)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (msgEmitter.isDataServer())
    {
      // Monitoring information requested by a DS
      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID(), monitorData);
      try
      {
        senderHandler.send(errMsg);
      } catch (IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying to send an error msg to this server.
         * Log an error and close the connection to this server.
         */
        MessageBuilder mb2 = new MessageBuilder();
        mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
        mb2.append(stackTraceToSingleLineString(ioe));
        logError(mb2.toMessage());
        stopServer(senderHandler, false);
        msgEmitter.send(monitorMsg);
      }
    } else
      catch (IOException e)
      {
        // the connection was closed.
      }
    }
    else
    {
      for (ServerHandler targetHandler : servers)
      // Monitoring information requested by a RS
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      if (monitorMsg != null)
      {
        try
        {
          targetHandler.send(msg);
        } catch (IOException ioe)
          msgEmitter.send(monitorMsg);
        }
        catch (IOException e)
        {
          /*
           * An error happened trying the send a routable message
           * to its destination server.
           * Send back an error to the originator of the message.
           */
          MessageBuilder mb1 = new MessageBuilder();
          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
              this.baseDn, Integer.toString(msg.getDestination())));
          mb1.append(" unroutable message =" + msg.getClass().getSimpleName());
          mb1.append(" Details: " + ioe.getLocalizedMessage());
          ErrorMsg errMsg = new ErrorMsg(
            msg.getSenderID(), mb1.toMessage());
          logError(mb1.toMessage());
          try
          {
            senderHandler.send(errMsg);
          } catch (IOException ioe1)
          {
            // an error happened on the sender session trying to recover
            // from an error on the receiver session.
            // We don't have much solution left beside closing the sessions.
            stopServer(senderHandler, false);
            stopServer(targetHandler, false);
          }
        // TODO Handle error properly (sender timeout in addition)
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
              .getDestination())));
        }
      }
    }
  }
  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
      RoutableMsg msg)
  {
    String msgClassname = msg.getClass().getCanonicalName();
    logError(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
    MessageBuilder mb = new MessageBuilder();
    mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
    mb.append("serverID:").append(msg.getDestination());
    ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage());
    try
    {
      msgEmitter.send(errMsg);
    }
    catch (IOException ignored)
    {
      // an error happened on the sender session trying to recover
      // from an error on the receiver session.
      // Not much more we can do at this point.
    }
  }
  private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter,
      RoutableMsg msg)
  {
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
        this.baseDn, Integer.toString(msg.getDestination())));
    mb.append(" In Replication Server=").append(
      this.localReplicationServer.getMonitorInstanceName());
    mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
    mb.append(" Details:routing table is empty");
    final Message message = mb.toMessage();
    logError(message);
    ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(),
        msg.getSenderID(), message);
    try
    {
      msgEmitter.send(errMsg);
    }
    catch (IOException ignored)
    {
      // TODO Handle error properly (sender timeout in addition)
      /*
       * An error happened trying to send an error msg to this server.
       * Log an error and close the connection to this server.
       */
      MessageBuilder mb2 = new MessageBuilder();
      mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
      mb2.append(stackTraceToSingleLineString(ignored));
      logError(mb2.toMessage());
      stopServer(msgEmitter, false);
    }
  }
  private void forwardMsgToAllServers(RoutableMsg msg,
      List<ServerHandler> servers, ServerHandler msgEmitter)
  {
    for (ServerHandler targetHandler : servers)
    {
      try
      {
        targetHandler.send(msg);
      } catch (IOException ioe)
      {
        /*
         * An error happened trying to send a routable message to its
         * destination server.
         * Send back an error to the originator of the message.
         */
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
            this.baseDn, Integer.toString(msg.getDestination())));
        mb.append(" unroutable message =" + msg.getClass().getSimpleName());
        mb.append(" Details: " + ioe.getLocalizedMessage());
        final Message message = mb.toMessage();
        logError(message);
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
        try
        {
          msgEmitter.send(errMsg);
        } catch (IOException ioe1)
        {
          // an error happened on the sender session trying to recover
          // from an error on the receiver session.
          // We don't have much solution left beside closing the sessions.
          stopServer(msgEmitter, false);
          stopServer(targetHandler, false);
        }
      // TODO Handle error properly (sender timeout in addition)
      }
    }
  }
  /**
   * Creates a new monitor message including monitoring information for the
@@ -1720,13 +1753,11 @@
  public MonitorMsg createGlobalTopologyMonitorMsg(
      int sender, int destination, MonitorData monitorData)
  {
    MonitorMsg returnMsg =
      new MonitorMsg(sender, destination);
    final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
    returnMsg.setReplServerDbState(getDbServerState());
    // Add the informations about the Replicas currently in
    // the topology.
    // Add the informations about the Replicas currently in the topology.
    Iterator<Integer> it = monitorData.ldapIterator();
    while (it.hasNext())
    {
@@ -1736,8 +1767,7 @@
          monitorData.getApproxFirstMissingDate(replicaId), true);
    }
    // Add the information about the Replication Servers
    // currently in the topology.
    // Add the information about the RSs currently in the topology.
    it = monitorData.rsIterator();
    while (it.hasNext())
    {
@@ -1787,16 +1817,14 @@
      for (DataServerHandler lsh : this.directoryServers.values())
      {
        monitorMsg.setServerState(lsh.getServerId(),
            lsh.getServerState(), lsh.getApproxFirstMissingDate(),
            true);
            lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      {
        monitorMsg.setServerState(rsh.getServerId(),
            rsh.getServerState(), rsh.getApproxFirstMissingDate(),
            false);
            rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
      }
      // Populate the RS state in the msg from the DbState
@@ -1821,15 +1849,12 @@
    stopAllServers(true);
    stopDbHandlers();
    shutdownDbHandlers();
  }
  /**
   * Stop the dbHandlers .
   */
  private void stopDbHandlers()
  /** Shutdown all the dbHandlers. */
  private void shutdownDbHandlers()
  {
    // Shutdown the dbHandlers
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
@@ -1964,9 +1989,7 @@
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
    rsInfos.add(localRSInfo);
    rsInfos.add(toRSInfo(localReplicationServer, generationId));
    return new TopologyMsg(dsInfos, rsInfos);
  }
@@ -1982,10 +2005,8 @@
   */
  public TopologyMsg createTopologyMsgForDS(int destDsId)
  {
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    // Go through every DSs (except recipient of msg)
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    for (DataServerHandler serverHandler : directoryServers.values())
    {
      if (serverHandler.getServerId() == destDsId)
@@ -1995,15 +2016,15 @@
      dsInfos.add(serverHandler.toDSInfo());
    }
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    // Add our own info (local RS)
    RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
    rsInfos.add(localRSInfo);
    rsInfos.add(toRSInfo(localReplicationServer, generationId));
    // Go through every peer RSs (and get their connected DSs), also add info
    // for RSs
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    {
      // Put RS info
      rsInfos.add(serverHandler.toRSInfo());
      serverHandler.addDSInfos(dsInfos);
@@ -2354,11 +2375,11 @@
          logError(mb.toMessage());
        }
      }
      stopDbHandlers();
      shutdownDbHandlers();
    }
    try
    {
      replicationServer.clearGenerationId(baseDn);
      localReplicationServer.clearGenerationId(baseDn);
    } catch (Exception e)
    {
      // TODO: i18n
@@ -2381,7 +2402,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " baseDN=" + baseDn + " isDegraded serverId=" + serverId
          + " given local generation Id=" + this.generationId);
    }
@@ -2398,7 +2420,8 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " baseDN=" + baseDn + " Compute degradation of serverId="
          + serverId + " LS server generation Id=" + handler.getGenerationId());
    }
@@ -2411,7 +2434,7 @@
   */
  public ReplicationServer getReplicationServer()
  {
    return replicationServer;
    return localReplicationServer;
  }
  /**
@@ -2557,7 +2580,7 @@
              int serverId = rs.getServerId();
              MonitorRequestMsg msg = new MonitorRequestMsg(
                  this.replicationServer.getServerId(), serverId);
                  this.localReplicationServer.getServerId(), serverId);
              try
              {
                rs.send(msg);
@@ -2684,7 +2707,7 @@
    // - from our own local db state
    // - whatever they are directly or indirectly connected
    ServerState dbServerState = getDbServerState();
    pendingMonitorData.setRSState(replicationServer.getServerId(),
    pendingMonitorData.setRSState(localReplicationServer.getServerId(),
        dbServerState);
    for (int serverId : dbServerState) {
      ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
@@ -2744,7 +2767,7 @@
        while (rsidIterator.hasNext())
        {
          int rsid = rsidIterator.next();
          if (rsid == replicationServer.getServerId())
          if (rsid == localReplicationServer.getServerId())
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the fmd of my connected LS
@@ -2895,7 +2918,7 @@
    if (statusAnalyzer == null)
    {
      int degradedStatusThreshold =
        replicationServer.getDegradedStatusThreshold();
        localReplicationServer.getDegradedStatusThreshold();
      if (degradedStatusThreshold > 0) // 0 means no status analyzer
      {
        statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
@@ -2946,7 +2969,7 @@
    if (monitoringPublisher == null)
    {
      long period =
        replicationServer.getMonitoringPublisherPeriod();
        localReplicationServer.getMonitoringPublisherPeriod();
      if (period > 0) // 0 means no monitoring publisher
      {
        monitoringPublisher = new MonitoringPublisher(this, period);
@@ -3004,8 +3027,8 @@
  @Override
  public String getMonitorInstanceName()
  {
    return "Replication server RS(" + replicationServer.getServerId() + ") "
        + replicationServer.getServerURL() + ",cn="
    return "Replication server RS(" + localReplicationServer.getServerId()
        + ") " + localReplicationServer.getServerURL() + ",cn="
        + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication";
  }
@@ -3018,9 +3041,9 @@
    // publish the server id and the port number.
    List<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(Attributes.create("replication-server-id",
        String.valueOf(replicationServer.getServerId())));
        String.valueOf(localReplicationServer.getServerId())));
    attributes.add(Attributes.create("replication-server-port",
        String.valueOf(replicationServer.getReplicationPort())));
        String.valueOf(localReplicationServer.getReplicationPort())));
    // Add all the base DNs that are known by this replication server.
    attributes.add(Attributes.create("domain-name", baseDn));
@@ -3032,7 +3055,7 @@
    MonitorData md = getDomainMonitorData();
    // Missing changes
    long missingChanges = md.getMissingChangesRS(replicationServer
    long missingChanges = md.getMissingChangesRS(localReplicationServer
        .getServerId());
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(missingChanges)));
@@ -3201,30 +3224,13 @@
      }
      */
      boolean serverIdConnected = false;
      if (directoryServers.containsKey(serverId))
      {
        serverIdConnected = true;
      }
      else
      {
        // not directly connected
        for (ReplicationServerHandler rsh : replicationServers.values())
        {
          if (rsh.isRemoteLDAPServer(serverId))
          {
            serverIdConnected = true;
            break;
          }
        }
      }
      if (!serverIdConnected)
      if (!isServerConnected(serverId))
      {
        if (debugEnabled())
        {
          TRACER.debugInfo("In " + "Replication Server "
              + replicationServer.getReplicationPort() + " " + baseDn + " "
              + replicationServer.getServerId() + " Server " + serverId
              + localReplicationServer.getReplicationPort() + " " + baseDn + " "
              + localReplicationServer.getServerId() + " Server " + serverId
              + " is not considered for eligibility ... potentially down");
        }
        continue;
@@ -3246,13 +3252,31 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In Replication Server "
          + replicationServer.getReplicationPort() + " " + baseDn + " "
          + replicationServer.getServerId()
          + localReplicationServer.getReplicationPort() + " " + baseDn + " "
          + localReplicationServer.getServerId()
          + " getEligibleCN() returns result =" + eligibleCN);
    }
    return eligibleCN;
  }
  private boolean isServerConnected(int serverId)
  {
    if (directoryServers.containsKey(serverId))
    {
      return true;
    }
    // not directly connected
    for (ReplicationServerHandler rsHandler : replicationServers.values())
    {
      if (rsHandler.isRemoteLDAPServer(serverId))
      {
        return true;
      }
    }
    return false;
  }
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
@@ -3299,8 +3323,8 @@
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                .get("Replication Server "
                    + replicationServer.getReplicationPort() + " "
                    + baseDn + " " + replicationServer.getServerId()));
                    + localReplicationServer.getReplicationPort() + " "
                    + baseDn + " " + localReplicationServer.getServerId()));
            stopServer(rsHandler, false);
          }
        }
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -46,6 +41,11 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
 * This class defines a server handler, which handles all interaction with a
 * peer replication server.
@@ -82,10 +82,8 @@
      generationId = inReplServerStartMsg.getGenerationId();
      serverId = inReplServerStartMsg.getServerId();
      serverURL = inReplServerStartMsg.getServerURL();
      int separator = serverURL.lastIndexOf(':');
      serverAddressURL =
        session.getRemoteAddress() + ":" + serverURL.substring(separator +
            1);
      final String port = serverURL.substring(serverURL.lastIndexOf(':') + 1);
      serverAddressURL = session.getRemoteAddress() + ":" + port;
      setBaseDNAndDomain(inReplServerStartMsg.getBaseDn(), false);
      setInitialServerState(inReplServerStartMsg.getServerState());
      setSendWindowSize(inReplServerStartMsg.getWindowSize());
@@ -119,8 +117,7 @@
        getReplicationServerId(), getReplicationServerURL(), getBaseDN(),
        maxRcvWindow, replicationServerDomain.getDbServerState(),
        localGenerationId, sslEncryption,
        getLocalGroupId(), replicationServerDomain.getReplicationServer()
            .getDegradedStatusThreshold());
        getLocalGroupId(), replicationServer.getDegradedStatusThreshold());
    send(outReplServerStartMsg);
    return outReplServerStartMsg;
  }
@@ -296,7 +293,7 @@
    finally
    {
      // Release domain
      if ((replicationServerDomain != null) &&
      if (replicationServerDomain != null &&
          replicationServerDomain.hasLock())
        replicationServerDomain.release();
    }
@@ -374,11 +371,9 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              this + " RS V1 with serverID=" + serverId +
              " is connected with the right generation ID");
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                + " " + this + " RS V1 with serverID=" + serverId
                + " is connected with the right generation ID");
          }
        } else
        {
@@ -420,10 +415,9 @@
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(Integer
          .toString(inReplServerStartMsg.getServerId()), Integer
          .toString(replicationServerDomain.getReplicationServer()
              .getServerId()));
      Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
          Integer.toString(inReplServerStartMsg.getServerId()),
          Integer.toString(replicationServer.getServerId()));
      abortStart(errMessage);
    }
    catch (DirectoryException e)
@@ -444,7 +438,7 @@
    }
    finally
    {
      if ((replicationServerDomain != null) &&
      if (replicationServerDomain != null &&
          replicationServerDomain.hasLock())
        replicationServerDomain.release();
    }
@@ -489,12 +483,10 @@
        // connection attempt.
        return null;
      }
      else
      {
        Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
            .getClass().getCanonicalName(), "TopologyMsg");
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(
          msg.getClass().getCanonicalName(), "TopologyMsg");
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    // Remote RS sent his topo msg
@@ -518,10 +510,9 @@
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("In " +
            replicationServerDomain.getReplicationServer().
            getMonitorInstanceName() + " RS with serverID=" + serverId +
            " is connected with the right generation ID, same as local ="
        TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
            + " RS with serverID=" + serverId
            + " is connected with the right generation ID, same as local ="
            + generationId);
      }
    }
@@ -541,42 +532,40 @@
  {
    if (localGenerationId > 0)
    { // the local RS is initialized
      if (generationId > 0)
      { // the remote RS is initialized.
        // If not, there's nothing to do anyway.
        if (generationId != localGenerationId)
        {
          /* Either:
           *
           * 1) The 2 RS have different generationID
           * replicationServerDomain.getGenerationIdSavedStatus() == true
           *
           * if the present RS has received changes regarding its
           * gen ID and so won't change without a reset
           * then  we are just degrading the peer.
           *
           * 2) This RS has never received any changes for the current
           * generation ID.
           *
           * Example case:
           * - we are in RS1
           * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
           * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
           * - we are in RS1 and we receive a START msg from RS2
           * - Each RS keeps its genID / is degraded and when LS2
           * will be populated from LS1 everything will become ok.
           *
           * Issue:
           * FIXME : Would it be a good idea in some cases to just set the
           * gen ID received from the peer RS specially if the peer has a
           * non null state and we have a null state ?
           * replicationServerDomain.setGenerationId(generationId, false);
           */
          Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
                  serverId, session.getReadableRemoteAddress(), generationId,
                  getBaseDN(), getReplicationServerId(), localGenerationId);
          logError(message);
        }
      if (generationId > 0
          // the remote RS is initialized. If not, there's nothing to do anyway.
          && generationId != localGenerationId)
      {
        /* Either:
         *
         * 1) The 2 RS have different generationID
         * replicationServerDomain.getGenerationIdSavedStatus() == true
         *
         * if the present RS has received changes regarding its
         * gen ID and so won't change without a reset
         * then  we are just degrading the peer.
         *
         * 2) This RS has never received any changes for the current
         * generation ID.
         *
         * Example case:
         * - we are in RS1
         * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
         * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
         * - we are in RS1 and we receive a START msg from RS2
         * - Each RS keeps its genID / is degraded and when LS2
         * will be populated from LS1 everything will become ok.
         *
         * Issue:
         * FIXME : Would it be a good idea in some cases to just set the
         * gen ID received from the peer RS specially if the peer has a
         * non null state and we have a null state ?
         * replicationServerDomain.setGenerationId(generationId, false);
         */
        Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
            serverId, session.getReadableRemoteAddress(), generationId,
            getBaseDN(), getReplicationServerId(), localGenerationId);
        logError(message);
      }
    }
    else
@@ -655,9 +644,7 @@
    groupId = rsInfo.getGroupId();
    weight = rsInfo.getWeight();
    /**
     * Store info for DSs connected to the peer RS
     */
    // Store info for DSs connected to the peer RS
    List<DSInfo> dsInfos = topoMsg.getDsList();
    synchronized (remoteDirectoryServers)
@@ -688,18 +675,18 @@
   * When this handler is connected to a replication server, specifies if
   * a wanted server is connected to this replication server.
   *
   * @param wantedServer The server we want to know if it is connected
   * @param serverId The server we want to know if it is connected
   * to the replication server represented by this handler.
   * @return boolean True is the wanted server is connected to the server
   * represented by this handler.
   */
  public boolean isRemoteLDAPServer(int wantedServer)
  public boolean isRemoteLDAPServer(int serverId)
  {
    synchronized (remoteDirectoryServers)
    {
      for (LightweightServerHandler server : remoteDirectoryServers.values())
      {
        if (wantedServer == server.getServerId())
        if (serverId == server.getServerId())
        {
          return true;
        }
@@ -765,9 +752,8 @@
    MonitorData md = replicationServerDomain.getDomainMonitorData();
    // Missing changes
    long missingChanges = md.getMissingChangesRS(serverId);
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(missingChanges)));
        String.valueOf(md.getMissingChangesRS(serverId))));
    /* get the Server State */
    AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -791,17 +777,10 @@
  {
    if (serverId != 0)
    {
      StringBuilder builder = new StringBuilder("Replication server RS(");
      builder.append(serverId);
      builder.append(") for domain \"");
      builder.append(replicationServerDomain.getBaseDn());
      builder.append("\"");
      return builder.toString();
      return "Replication server RS(" + serverId + ") for domain \""
          + replicationServerDomain.getBaseDn() + "\"";
    }
    else
    {
      return "Unknown server";
    }
    return "Unknown server";
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,8 +44,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.replication.server.changelog.je.ReplicationDB
    .ReplServerDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -265,22 +264,20 @@
   * managed by this dbHandler and starting at the position defined
   * by a given changeNumber.
   *
   * @param changeNumber The position where the iterator must start.
   *
   * @param startAfterCN The position where the iterator must start.
   * @return a new ReplicationIterator that allows to browse the db
   *         managed by this dbHandler and starting at the position defined
   *         by a given changeNumber.
   *
   * @throws ChangelogException if a database problem happened.
   */
  public ReplicationIterator generateIterator(ChangeNumber changeNumber)
  public ReplicationIterator generateIterator(ChangeNumber startAfterCN)
      throws ChangelogException
  {
    if (changeNumber == null)
    if (startAfterCN == null)
    {
      flush();
    }
    return new JEReplicationIterator(db, changeNumber, this);
    return new JEReplicationIterator(db, startAfterCN, this);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
@@ -32,8 +32,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.replication.server.changelog.je.ReplicationDB
    .ReplServerDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
/**
 * Berkeley DB JE implementation of IReplicationIterator.
@@ -52,20 +51,20 @@
   * releaseCursor() method.
   *
   * @param db The db where the iterator must be created.
   * @param changeNumber The ChangeNumber after which the iterator must start.
   * @param startAfterCN The ChangeNumber after which the iterator must start.
   * @param dbHandler The associated DbHandler.
   * @throws ChangelogException if a database problem happened.
   */
  public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
  public JEReplicationIterator(ReplicationDB db, ChangeNumber startAfterCN,
      DbHandler dbHandler) throws ChangelogException
  {
    this.db = db;
    this.dbHandler = dbHandler;
    this.lastNonNullCurrentCN = changeNumber;
    this.lastNonNullCurrentCN = startAfterCN;
    try
    {
      cursor = db.openReadCursor(changeNumber);
      cursor = db.openReadCursor(startAfterCN);
    }
    catch(Exception e)
    {
@@ -79,7 +78,7 @@
      dbHandler.flush();
      // look again in the db
      cursor = db.openReadCursor(changeNumber);
      cursor = db.openReadCursor(startAfterCN);
      if (cursor == null)
      {
        throw new ChangelogException(Message.raw("no new change"));