mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.16.2013 fa73aa0575f97205255d66ffed45d64bc04434ed
ReplicationServerDomain.java:
Changed getCount() return type from int to long (This change goes along with r9386).
Extracted method addUpdate(), isDifferentGenerationId(), isSameGenerationId(), collectRSsEligibleForAssuredReplication(), toRSInfo().
In many places, renamed sid to serverId.
Various cleanups:
- converted comments to javadoc
- added curly braces around if bodies
- collapsed if statements
- used interfaces instead of concrete classes
- removed useless parentheses
- removed useless 'this' qualifier in non static method calls
- put code on one line where it fits


MessageHandler.java:
In add(), removed unused parameter MessageHandler.
In getOlderUpdateCN(), removed useless reassignment of a local variable.
2 files modified
777 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/MessageHandler.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 766 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -140,9 +140,8 @@
   *
   * @param update The update that must be added to the list of updates of
   * this handler.
   * @param sourceHandler The source handler that generated the update.
   */
  public void add(UpdateMsg update, MessageHandler sourceHandler)
  public void add(UpdateMsg update)
  {
    synchronized (msgQueue)
    {
@@ -445,15 +444,13 @@
    {
      if (following)
      {
        if (msgQueue.isEmpty())
        {
          result = null;
        } else
        if (!msgQueue.isEmpty())
        {
          UpdateMsg msg = msgQueue.first();
          result = msg.getChangeNumber();
        }
      } else
      }
      else
      {
        if (lateQueue.isEmpty())
        {
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -33,6 +33,7 @@
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -73,55 +74,56 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
  private final String baseDn;
  // The Status analyzer that periodically verifies if the connected DSs are
  // late or not
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
   * are late.
   */
  private StatusAnalyzer statusAnalyzer = null;
  // The monitoring publisher that periodically sends monitoring messages to the
  // topology
  /**
   * The monitoring publisher that periodically sends monitoring messages to the
   * topology.
   */
  private MonitoringPublisher monitoringPublisher = null;
  /*
   * The following map contains one balanced tree for each replica ID
   * to which we are currently publishing
   * the first update in the balanced tree is the next change that we
   * must push to this particular server
   *
   * We add new TreeSet in the HashMap when a new server register
   * to this replication server.
   *
  /**
   * The following map contains one balanced tree for each replica ID to which
   * we are currently publishing the first update in the balanced tree is the
   * next change that we must push to this particular server.
   * <p>
   * We add new TreeSet in the HashMap when a new server register to this
   * replication server.
   */
  private final Map<Integer, DataServerHandler> directoryServers =
    new ConcurrentHashMap<Integer, DataServerHandler>();
  /*
   * This map contains one ServerHandler for each replication servers
   * with which we are connected (so normally all the replication servers)
   * the first update in the balanced tree is the next change that we
   * must push to this particular server
   *
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
  /**
   * This map contains one ServerHandler for each replication servers with which
   * we are connected (so normally all the replication servers) the first update
   * in the balanced tree is the next change that we must push to this
   * particular server.
   * <p>
   * We add new TreeSet in the HashMap when a new replication server register to
   * this replication server.
   */
  private final Map<Integer, ReplicationServerHandler> replicationServers =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final ConcurrentLinkedQueue<MessageHandler> otherHandlers =
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  /*
   * This map contains the List of updates received from each
   * LDAP server
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<Integer, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Integer, DbHandler>();
  private ReplicationServer replicationServer;
  // GenerationId management
  /** GenerationId management. */
  private volatile long generationId = -1;
  private boolean generationIdSavedStatus = false;
  // The tracer object for the debug logger.
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  // Monitor data management
@@ -130,42 +132,53 @@
   */
  private volatile MonitorData monitorData = new MonitorData();
  // This lock guards against multiple concurrent monitor data recalculation.
  /**
   * This lock guards against multiple concurrent monitor data recalculation.
   */
  private final Object pendingMonitorLock = new Object();
  // Guarded by pendingMonitorLock.
  /** Guarded by pendingMonitorLock. */
  private long monitorDataLastBuildDate = 0;
  // The set of replication servers which are already known to be slow to send
  // monitor data.
  //
  // Guarded by pendingMonitorLock.
  /**
   * The set of replication servers which are already known to be slow to send
   * monitor data.
   * <p>
   * Guarded by pendingMonitorLock.
   */
  private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
  // This lock serializes updates to the pending monitor data.
  /** This lock serializes updates to the pending monitor data. */
  private final Object pendingMonitorDataLock = new Object();
  // Monitor data which is currently being calculated.
  //
  // Guarded by pendingMonitorDataLock.
  /**
   * Monitor data which is currently being calculated. Guarded by
   * pendingMonitorDataLock.
   */
  private MonitorData pendingMonitorData;
  // A set containing the IDs of servers from which we are currently expecting
  // monitor responses. When a response is received from a server we remove the
  // ID from this table, and count down the latch if the ID was in the table.
  //
  // Guarded by pendingMonitorDataLock.
  /**
   * A set containing the IDs of servers from which we are currently expecting
   * monitor responses. When a response is received from a server we remove the
   * ID from this table, and count down the latch if the ID was in the table.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private final Set<Integer> pendingMonitorDataServerIDs =
    new HashSet<Integer>();
  // This latch is non-null and is used in order to count incoming responses as
  // they arrive. Since incoming response may arrive at any time, even when
  // there is no pending monitor request, access to the latch must be guarded.
  //
  // Guarded by pendingMonitorDataLock.
  /**
   * This latch is non-null and is used in order to count incoming responses as
   * they arrive. Since incoming response may arrive at any time, even when
   * there is no pending monitor request, access to the latch must be guarded.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private CountDownLatch pendingMonitorDataLatch = null;
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  /**
   * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
   */
  private final long monitorDataLifeTime = 500;
@@ -173,20 +186,28 @@
  /**
   * The needed info for each received assured update message we are waiting
   * acks for.
   * <p>
   * Key: a change number matching a received update message which requested
   * assured mode usage (either safe read or safe data mode)
   * <p>
   * Value: The object holding every info needed about the already received acks
   * as well as the acks to be received.
   * For more details, see ExpectedAcksInfo and its sub classes javadoc.
   *
   * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
   *      classes javadoc.
   */
  private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
    new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
  // The timer used to run the timeout code (timer tasks) for the assured update
  // messages we are waiting acks for.
  /**
   * The timer used to run the timeout code (timer tasks) for the assured update
   * messages we are waiting acks for.
   */
  private Timer assuredTimeoutTimer = null;
  // Counter used to purge the timer tasks references in assuredTimeoutTimer,
  // every n number of treated assured messages
  /**
   * Counter used to purge the timer tasks references in assuredTimeoutTimer,
   * every n number of treated assured messages.
   */
  private int assuredTimeoutTimerPurgeCounter = 0;
  private ServerState ctHeartbeatState = null;
@@ -345,10 +366,17 @@
        // Purge timer every 100 treated messages
        assuredTimeoutTimerPurgeCounter++;
        if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
        {
          assuredTimeoutTimer.purge();
        }
      }
    }
    if (expectedServers == null)
    {
      expectedServers = Collections.emptyList();
    }
    /**
     * The update message equivalent to the originally received update message,
     * but with assured flag disabled. This message is the one that should be
@@ -374,42 +402,25 @@
         * Ignore updates to RS with bad gen id
         * (no system managed status for a RS)
         */
        if ( (generationId>0) && (generationId != handler.getGenerationId()) )
        if (isDifferentGenerationId(handler.getGenerationId()))
        {
          if (debugEnabled())
            TRACER.debugInfo("In " + "Replication Server " +
              replicationServer.getReplicationPort() + " " +
              baseDn + " " + replicationServer.getServerId() +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to replication server " +
              handler.getServerId() + " with generation id " +
              handler.getGenerationId() + " different from local " +
              "generation id " + generationId);
          {
            TRACER.debugInfo("In Replication Server "
                + replicationServer.getReplicationPort() + " " + baseDn + " "
                + replicationServer.getServerId() + " for dn " + baseDn
                + ", update " + update.getChangeNumber()
                + " will not be sent to replication server "
                + handler.getServerId() + " with generation id "
                + handler.getGenerationId() + " different from local "
                + "generation id " + generationId);
          }
          continue;
        }
        if (assuredMessage)
        {
          // Assured mode: post an assured or not assured matching update
          // message according to what has been computed for the destination
          // server
          if ((expectedServers != null) && expectedServers.contains(handler.
            getServerId()))
          {
            handler.add(update, sourceHandler);
          } else
          {
            if (notAssuredUpdate == null)
            {
              notAssuredUpdate = new NotAssuredUpdateMsg(update);
            }
            handler.add(notAssuredUpdate, sourceHandler);
          }
        } else
        {
          handler.add(update, sourceHandler);
        }
        notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
            assuredMessage, expectedServers);
      }
    }
@@ -436,58 +447,71 @@
       * allows to have better performances in normal mode (most of the time).
       */
      ServerStatus dsStatus = handler.getStatus();
      if ( (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
        (dsStatus == ServerStatus.FULL_UPDATE_STATUS) )
      if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
          || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
      {
        if (debugEnabled())
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
            TRACER.debugInfo("In " + this +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to directory server " +
              handler.getServerId() + " with generation id " +
              handler.getGenerationId() + " different from local " +
              "generation id " + generationId);
          {
            TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
                + update.getChangeNumber()
                + " will not be sent to directory server "
                + handler.getServerId() + " with generation id "
                + handler.getGenerationId() + " different from local "
                + "generation id " + generationId);
          }
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to directory server " + handler.getServerId() +
              " as it is in full update");
          {
            TRACER.debugInfo("In RS " + replicationServer.getServerId()
                + " for dn " + baseDn + ", update " + update.getChangeNumber()
                + " will not be sent to directory server "
                + handler.getServerId() + " as it is in full update");
          }
        }
        continue;
      }
      if (assuredMessage)
      {
        // Assured mode: post an assured or not assured matching update
        // message according to what has been computed for the destination
        // server
        if ((expectedServers != null) && expectedServers.contains(handler.
          getServerId()))
        {
          handler.add(update, sourceHandler);
        } else
        {
          if (notAssuredUpdate == null)
          {
            notAssuredUpdate = new NotAssuredUpdateMsg(update);
          }
          handler.add(notAssuredUpdate, sourceHandler);
        }
      } else
      {
        handler.add(update, sourceHandler);
      }
      notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
          assuredMessage, expectedServers);
    }
    // Push the message to the other subscribing handlers
    for (MessageHandler handler : otherHandlers) {
      handler.add(update, sourceHandler);
      handler.add(update);
    }
  }
  private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
      UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
      boolean assuredMessage, List<Integer> expectedServers)
      throws UnsupportedEncodingException
  {
    if (assuredMessage)
    {
      // Assured mode: post an assured or not assured matching update
      // message according to what has been computed for the destination server
      if (expectedServers.contains(handler.getServerId()))
      {
        handler.add(update);
      }
      else
      {
        if (notAssuredUpdate == null)
        {
          notAssuredUpdate = new NotAssuredUpdateMsg(update);
        }
        handler.add(notAssuredUpdate);
      }
    }
    else
    {
      handler.add(update);
    }
    return notAssuredUpdate;
  }
  /**
   * Helper class to be the return type of a method that processes a just
   * received assured update message:
@@ -503,7 +527,6 @@
       * should be not null.
       * Servers that are not in this list are servers not eligible for an ack
       * request.
       *
       */
      public List<Integer> expectedServers = null;
@@ -543,20 +566,7 @@
    {
      if (sourceHandler.isDataServer())
      {
        // Look for RS eligible for assured
        for (ReplicationServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            // No ack expected from a RS with different group id
          {
            if ((generationId > 0) &&
              (generationId == handler.getGenerationId()))
              // No ack expected from a RS with bad gen id
            {
              expectedServers.add(handler.getServerId());
            }
          }
        }
        collectRSsEligibleForAssuredReplication(groupId, expectedServers);
      }
      // Look for DS eligible for assured
@@ -574,23 +584,19 @@
          if (serverStatus == ServerStatus.NORMAL_STATUS)
          {
            expectedServers.add(handler.getServerId());
          } else
          } else if (serverStatus == ServerStatus.DEGRADED_STATUS) {
            // No ack expected from a DS with wrong status
          {
            if (serverStatus == ServerStatus.DEGRADED_STATUS)
            {
              wrongStatusServers.add(handler.getServerId());
            }
            /**
             * else
             * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
             * We do not want this to be reported as an error to the update
             * maker -> no pollution or potential misunderstanding when
             * reading logs or monitoring and it was just administration (for
             * instance new server is being configured in topo: it goes in bad
             * gen then then full full update).
             */
            wrongStatusServers.add(handler.getServerId());
          }
          /*
           * else
           * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
           * We do not want this to be reported as an error to the update
           * maker -> no pollution or potential misunderstanding when
           * reading logs or monitoring and it was just administration (for
           * instance new server is being configured in topo: it goes in bad
           * gen then full update).
           */
        }
      }
    }
@@ -644,15 +650,11 @@
        Integer.toString(replicationServer.getServerId()),
        Byte.toString(safeDataLevel), baseDn, update.toString());
      logError(errorMsg);
    } else if (sourceGroupId != groupId)
    } else if (sourceGroupId == groupId
    // Assured feature does not cross different group IDS
        && isSameGenerationId(sourceHandler.getGenerationId()))
    // Ignore assured updates from wrong generationId servers
    {
      // Assured feature does not cross different group IDS
    } else
    {
      if ((generationId > 0) &&
        (generationId == sourceHandler.getGenerationId()))
        // Ignore assured updates from wrong generationId servers
      {
        if (sourceHandler.isDataServer())
        {
          if (safeDataLevel == (byte) 1)
@@ -685,23 +687,12 @@
            sourceHandler.send(new AckMsg(cn));
          }
        }
      }
    }
    List<Integer> expectedServers = new ArrayList<Integer>();
    if (interestedInAcks && sourceHandler.isDataServer())
    {
      // Look for RS eligible for assured
      for (ReplicationServerHandler handler : replicationServers.values())
      {
        if (handler.getGroupId() == groupId
        // No ack expected from a RS with different group id
            && generationId > 0 && (generationId == handler.getGenerationId()))
        // No ack expected from a RS with bad gen id
        {
          expectedServers.add(handler.getServerId());
        }
      }
      collectRSsEligibleForAssuredReplication(groupId, expectedServers);
    }
    // Return computed structures
@@ -718,9 +709,9 @@
        // servers: the level is a best effort thing, we do not want to timeout
        // at every assured SD update for instance if a RS has had his gen id
        // reseted
        byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
        byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
          (byte)sdl : // Keep level as it was
          (byte)(nExpectedServers+1)); // Change level to match what's available
          (byte)(nExpectedServers+1); // Change level to match what's available
        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
          sourceHandler, finalSdl, expectedServers);
        preparedAssuredInfo.expectedServers = expectedServers;
@@ -735,6 +726,32 @@
    return preparedAssuredInfo;
  }
  private void collectRSsEligibleForAssuredReplication(byte groupId,
      List<Integer> expectedServers)
  {
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      if (handler.getGroupId() == groupId
      // No ack expected from a RS with different group id
            && isSameGenerationId(handler.getGenerationId())
        // No ack expected from a RS with bad gen id
        )
      {
        expectedServers.add(handler.getServerId());
      }
    }
  }
  private boolean isSameGenerationId(long generationId)
  {
    return this.generationId > 0 && this.generationId == generationId;
  }
  private boolean isDifferentGenerationId(long generationId)
  {
    return this.generationId > 0 && this.generationId != generationId;
  }
  /**
   * Process an ack received from a given server.
   *
@@ -810,7 +827,7 @@
    /**
     * Constructor for the timer task.
     * @param cn The changenumber of the assured update we are waiting acks for
     * @param cn The changeNumber of the assured update we are waiting acks for
     */
    public AssuredTimeoutTask(ChangeNumber cn)
    {
@@ -843,10 +860,13 @@
          AckMsg finalAck = expectedAcksInfo.createAck(true);
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
            TRACER.debugInfo(
              "In RS " + replicationServer.getServerId() + " for " + baseDn +
              ", sending timeout for assured update with change " + " number " +
              cn + " to server id " + origServer.getServerId());
          {
            TRACER.debugInfo("In RS " + replicationServer.getServerId()
                    + " for "+ baseDn
                    + ", sending timeout for assured update with change "
                    + " number " + cn + " to server id "
                    + origServer.getServerId());
          }
          try
          {
            origServer.send(finalAck);
@@ -882,37 +902,30 @@
          List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
          for (Integer serverId : serversInTimeout)
          {
            ServerHandler expectedServerInTimeout =
              directoryServers.get(serverId);
            if (expectedServerInTimeout != null)
            ServerHandler expectedDSInTimeout = directoryServers.get(serverId);
            ServerHandler expectedRSInTimeout =
                replicationServers.get(serverId);
            if (expectedDSInTimeout != null)
            {
              // Was a DS
              if (safeRead)
              {
                expectedServerInTimeout.incrementAssuredSrSentUpdatesTimeout();
                expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout();
              } else
              {
                // No SD update sent to a DS (meaningless)
              }
            } else
            } else if (expectedRSInTimeout != null)
            {
              expectedServerInTimeout =
                replicationServers.get(serverId);
              if (expectedServerInTimeout != null)
              if (safeRead)
              {
                // Was a RS
                if (safeRead)
                {
                  expectedServerInTimeout.
                    incrementAssuredSrSentUpdatesTimeout();
                } else
                {
                  expectedServerInTimeout.
                    incrementAssuredSdSentUpdatesTimeout();
                }
                expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout();
              }
              /* else server disappeared ? Let's forget about it. */
              else
              {
                expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout();
              }
            }
            // else server disappeared ? Let's forget about it.
          }
          // Mark the ack info object as completed to prevent potential
          // processAck() code parallel run
@@ -934,7 +947,9 @@
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
      {
        stopServer(handler, false);
      }
    }
  }
@@ -989,11 +1004,12 @@
   */
  public void stopServer(ServerHandler handler, boolean shutdown)
  {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " domain=" + this + " stopServer() on the server handler " +
            handler.getMonitorInstanceName());
    if (debugEnabled())
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the server handler "
          + handler.getMonitorInstanceName());
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
@@ -1027,10 +1043,12 @@
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        {
          if (debugEnabled())
            TRACER.debugInfo("In " +
              replicationServer.getMonitorInstanceName() +
              " remote server " + handler.getMonitorInstanceName() + " is " +
              "the last RS/DS to be stopped: stopping monitoring publisher");
          {
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                + " remote server " + handler.getMonitorInstanceName()
                + " is the last RS/DS to be stopped:"
                + " stopping monitoring publisher");
          }
          stopMonitoringPublisher();
        }
@@ -1057,10 +1075,11 @@
          if (directoryServers.size() == 1)
          {
            if (debugEnabled())
              TRACER.debugInfo("In " +
                replicationServer.getMonitorInstanceName() +
                " remote server " + handler.getMonitorInstanceName() +
                " is the last DS to be stopped: stopping status analyzer");
            {
              TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                  + " remote server " + handler.getMonitorInstanceName()
                  + " is the last DS to be stopped: stopping status analyzer");
            }
            stopStatusAnalyzer();
          }
@@ -1106,10 +1125,11 @@
  public void stopServer(MessageHandler handler)
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName()
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the message handler "
          + handler.getMonitorInstanceName());
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
@@ -1176,18 +1196,19 @@
   * server currently connected in the whole topology on this domain and
   * if the generationId has never been saved.
   *
   * - test emtpyness of directoryServers list
   * - test emptiness of directoryServers list
   * - traverse replicationServers list and test for each if DS are connected
   * So it strongly relies on the directoryServers list
   */
  private void mayResetGenerationId()
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In RS " + this.replicationServer.getMonitorInstanceName() +
        " for " + baseDn + " " +
        " mayResetGenerationId generationIdSavedStatus=" +
        generationIdSavedStatus);
    {
      TRACER.debugInfo("In RS "
          + this.replicationServer.getMonitorInstanceName() + " for " + baseDn
          + " " + " mayResetGenerationId generationIdSavedStatus="
          + generationIdSavedStatus);
    }
    // If there is no more any LDAP server connected to this domain in the
    // topology and the generationId has never been saved, then we can reset
@@ -1200,35 +1221,39 @@
        if (generationId != rsh.getGenerationId())
        {
          if (debugEnabled())
            TRACER.debugInfo(
              "In RS " + this.replicationServer.getMonitorInstanceName() +
              " for " + baseDn + " " +
              " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
              " that has different genId");
        } else
        {
          if (rsh.hasRemoteLDAPServers())
          {
            TRACER.debugInfo("In RS "
                + this.replicationServer.getMonitorInstanceName() + " for "
                + baseDn + " " + " mayResetGenerationId skip RS"
                + rsh.getMonitorInstanceName() + " that has different genId");
          }
        } else if (rsh.hasRemoteLDAPServers())
        {
            lDAPServersConnectedInTheTopology = true;
            if (debugEnabled())
              TRACER.debugInfo(
                "In RS " + this.replicationServer.getMonitorInstanceName() +
                " for " + baseDn + " " +
                " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
                " has servers connected to it - will not reset generationId");
            {
              TRACER.debugInfo("In RS "
                  + this.replicationServer.getMonitorInstanceName()
                  + " for "+ baseDn + " mayResetGenerationId RS"
                  + rsh.getMonitorInstanceName()
                  + " has servers connected to it"
                  + " - will not reset generationId");
            }
            break;
          }
        }
      }
    } else
    {
      lDAPServersConnectedInTheTopology = true;
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + this.replicationServer.getMonitorInstanceName() +
          " for " + baseDn + " " +
          " has servers connected to it - will not reset generationId");
      {
        TRACER.debugInfo("In RS "
            + this.replicationServer.getMonitorInstanceName() + " for "
            + baseDn + " "
            + " has servers connected to it - will not reset generationId");
      }
    }
    if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus
@@ -1292,7 +1317,6 @@
     * The next change to send is always the first one in the tree
     * So this methods simply need to check that dependencies are OK
     * and update this replicaId RUV
     *
     */
    return handler.take();
  }
@@ -1304,14 +1328,12 @@
   */
  public Set<String> getChangelogs()
  {
    LinkedHashSet<String> mySet = new LinkedHashSet<String>();
    Set<String> results = new LinkedHashSet<String>();
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      mySet.add(handler.getServerAddressURL());
      results.add(handler.getServerAddressURL());
    }
    return mySet;
    return results;
  }
  /**
@@ -1333,13 +1355,12 @@
   */
  public List<String> getConnectedLDAPservers()
  {
    List<String> mySet = new ArrayList<String>(0);
    List<String> results = new ArrayList<String>(0);
    for (DataServerHandler handler : directoryServers.values())
    {
      mySet.add(String.valueOf(handler.getServerId()));
      results.add(String.valueOf(handler.getServerId()));
    }
    return mySet;
    return results;
  }
  /**
@@ -1358,7 +1379,9 @@
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
    {
      return null;
    }
    ReplicationIterator it;
    try
@@ -1386,16 +1409,15 @@
  * @param from lower limit changenumber.
  * @param to   upper limit changenumber.
  * @return the number of changes.
  *
  */
  public int getCount(int serverId,
      ChangeNumber from, ChangeNumber to)
  public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
      return 0;
    return handler.getCount(from, to);
    if (handler != null)
    {
      return handler.getCount(from, to);
    }
    return 0;
  }
  /**
@@ -1450,8 +1472,7 @@
  private List<ServerHandler> getDestinationServers(RoutableMsg msg,
    ServerHandler senderHandler)
  {
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    List<ServerHandler> servers = new ArrayList<ServerHandler>();
    if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER)
    {
@@ -1476,7 +1497,9 @@
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
        {
          continue;
        }
        servers.add(destinationHandler);
      }
    } else
@@ -1775,7 +1798,7 @@
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(this.getDbServerState());
      monitorMsg.setReplServerDbState(getDbServerState());
      return monitorMsg;
    }
    finally
@@ -1849,7 +1872,7 @@
  {
    for (DataServerHandler handler : directoryServers.values())
    {
      if ((notThisOne == null) || (handler != notThisOne))
      if (notThisOne == null || handler != notThisOne)
        // All except passed one
      {
        for (int i=1; i<=2; i++)
@@ -1934,9 +1957,7 @@
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      replicationServer.getServerURL(), generationId,
      replicationServer.getGroupId(), replicationServer.getWeight());
    RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
    rsInfos.add(localRSInfo);
    return new TopologyMsg(dsInfos, rsInfos);
@@ -1961,14 +1982,14 @@
    for (DataServerHandler serverHandler : directoryServers.values())
    {
      if (serverHandler.getServerId() == destDsId)
      {
        continue;
      }
      dsInfos.add(serverHandler.toDSInfo());
    }
    // Add our own info (local RS)
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      replicationServer.getServerURL(), generationId,
      replicationServer.getGroupId(), replicationServer.getWeight());
    RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
    rsInfos.add(localRSInfo);
    // Go through every peer RSs (and get their connected DSs), also add info
@@ -1984,6 +2005,12 @@
    return new TopologyMsg(dsInfos, rsInfos);
  }
  private RSInfo toRSInfo(ReplicationServer rs, long generationId)
  {
    return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId,
        rs.getGroupId(), rs.getWeight());
  }
  /**
   * Get the generationId associated to this domain.
   *
@@ -2059,10 +2086,11 @@
    ResetGenerationIdMsg genIdMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In " + this +
          " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
          " for baseDn " + baseDn + ":\n" + genIdMsg);
    {
      TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from "
          + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n"
          + genIdMsg);
    }
    try
    {
@@ -2090,11 +2118,12 @@
      {
        // Order to take a gen id we already have, just ignore
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this
        {
          TRACER.debugInfo("In " + this
              + " Reset generation id requested for baseDn " + baseDn
              + " but generation id was already " + this.generationId
              + ":\n" + genIdMsg);
              + " but generation id was already " + this.generationId + ":\n"
              + genIdMsg);
        }
      }
      // If we are the first replication server warned,
@@ -2246,13 +2275,12 @@
      // status of a DS has to be changed. See more comments in run method of
      // StatusAnalyzer.
      if (debugEnabled())
        TRACER
            .debugInfo("Status analyzer for domain "
                + baseDn
                + " has been interrupted when"
                + " trying to acquire domain lock for changing the status"
                + " of DS "
                + serverHandler.getServerId());
      {
        TRACER.debugInfo("Status analyzer for domain " + baseDn
            + " has been interrupted when"
            + " trying to acquire domain lock for changing the status of DS "
            + serverHandler.getServerId());
      }
      return true;
    }
@@ -2273,8 +2301,7 @@
                e.getMessage()));
      }
      if ((newStatus == ServerStatus.INVALID_STATUS)
          || (newStatus == oldStatus))
      if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus)
      {
        // Change was impossible or already occurred (see StatusAnalyzer
        // comments)
@@ -2347,11 +2374,11 @@
  public boolean isDegradedDueToGenerationId(int serverId)
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " baseDN=" + baseDn +
        " isDegraded serverId=" + serverId +
        " given local generation Id=" + this.generationId);
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " baseDN=" + baseDn + " isDegraded serverId=" + serverId
          + " given local generation Id=" + this.generationId);
    }
    ServerHandler handler = replicationServers.get(serverId);
    if (handler == null)
@@ -2364,12 +2391,12 @@
    }
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " baseDN=" + baseDn +
        " Compute degradation of serverId=" + serverId +
        " LS server generation Id=" + handler.getGenerationId());
    return (handler.getGenerationId() != this.generationId);
    {
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " baseDN=" + baseDn + " Compute degradation of serverId="
          + serverId + " LS server generation Id=" + handler.getGenerationId());
    }
    return handler.getGenerationId() != this.generationId;
  }
  /**
@@ -2432,10 +2459,12 @@
        // Check if generation id has to be reseted
        mayResetGenerationId();
        if (generationId < 0)
        {
          generationId = handler.getGenerationId();
        }
      }
      if (generationId > 0 && (generationId != handler.getGenerationId()))
      if (isDifferentGenerationId(handler.getGenerationId()))
      {
        Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(handler
            .getServerId(), handler.session
@@ -2651,9 +2680,9 @@
    ServerState dbServerState = getDbServerState();
    pendingMonitorData.setRSState(replicationServer.getServerId(),
        dbServerState);
    for (int sid : dbServerState) {
      ChangeNumber storedCN = dbServerState.getChangeNumber(sid);
      pendingMonitorData.setMaxCN(sid, storedCN);
    for (int serverId : dbServerState) {
      ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
      pendingMonitorData.setMaxCN(serverId, storedCN);
    }
  }
@@ -2668,8 +2697,7 @@
   * @param serverId
   *          server handler that is receiving the message.
   */
  private void receivesMonitorDataResponse(MonitorMsg msg,
      int serverId)
  private void receivesMonitorDataResponse(MonitorMsg msg, int serverId)
  {
    synchronized (pendingMonitorDataLock)
    {
@@ -2690,8 +2718,7 @@
        pendingMonitorData.setMaxCNs(replServerState);
        // store the remote RS states.
        pendingMonitorData.setRSState(msg.getSenderID(),
            replServerState);
        pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2720,24 +2747,20 @@
            {
              int connectedlsid = connectedlsh.getServerId();
              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
              pendingMonitorData.setFirstMissingDate(connectedlsid,
                  newfmd);
              pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd);
            }
          }
          else
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ReplicationServerHandler rsjHdr = replicationServers
                .get(rsid);
            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
            if (rsjHdr != null)
            {
              for (int remotelsid : rsjHdr
                  .getConnectedDirectoryServerIds())
              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                pendingMonitorData.setFirstMissingDate(remotelsid,
                    newfmd);
                pendingMonitorData.setFirstMissingDate(remotelsid, newfmd);
              }
            }
          }
@@ -2816,7 +2839,7 @@
  private final ReentrantLock lock = new ReentrantLock();
  /**
   * This lock is used to protect the generationid variable.
   * This lock is used to protect the generationId variable.
   */
  private final Object generationIDLock = new Object();
@@ -2984,28 +3007,21 @@
   * {@inheritDoc}
   */
  @Override
  public ArrayList<Attribute> getMonitorData()
  public List<Attribute> getMonitorData()
  {
    /*
     * publish the server id and the port number.
     */
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    // publish the server id and the port number.
    List<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(Attributes.create("replication-server-id",
        String.valueOf(replicationServer.getServerId())));
    attributes.add(Attributes.create("replication-server-port",
        String.valueOf(replicationServer.getReplicationPort())));
    /*
     * Add all the base DNs that are known by this replication server.
     */
    AttributeBuilder builder = new AttributeBuilder("domain-name");
    builder.add(baseDn);
    attributes.add(builder.toAttribute());
    // Add all the base DNs that are known by this replication server.
    attributes.add(Attributes.create("domain-name", baseDn));
    // Publish to monitor the generation ID by replicationServerDomain
    builder = new AttributeBuilder("generation-id");
    builder.add(baseDn + " " + generationId);
    attributes.add(builder.toAttribute());
    attributes.add(Attributes.create("generation-id",
        baseDn + " " + generationId));
    MonitorData md = getDomainMonitorData();
@@ -3045,7 +3061,7 @@
  {
    if (ctHeartbeatState == null)
    {
      ctHeartbeatState = this.getDbServerState().duplicate();
      ctHeartbeatState = getDbServerState().duplicate();
    }
    return ctHeartbeatState;
  }
@@ -3053,6 +3069,7 @@
  /**
   * Computes the eligible server state for the domain.
   *
   * <pre>
   *     s1               s2          s3
   *     --               --          --
   *                                 cn31
@@ -3062,6 +3079,7 @@
   *     cn14
   *                     cn26
   *     cn13
   * </pre>
   *
   * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
   *
@@ -3070,7 +3088,7 @@
   */
  public ServerState getEligibleState(ChangeNumber eligibleCN)
  {
    ServerState dbState = this.getDbServerState();
    ServerState dbState = getDbServerState();
    // The result is initialized from the dbState.
    // From it, we don't want to keep the changes newer than eligibleCN.
@@ -3078,9 +3096,10 @@
    if (eligibleCN != null)
    {
      for (int sid : dbState) {
        DbHandler h = sourceDbHandlers.get(sid);
        ChangeNumber mostRecentDbCN = dbState.getChangeNumber(sid);
      for (int serverId : dbState)
      {
        DbHandler h = sourceDbHandlers.get(serverId);
        ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId);
        try {
          // Is the most recent change in the Db newer than eligible CN ?
          // if yes (like cn15 in the example above, then we have to go back
@@ -3090,13 +3109,13 @@
            ReplicationIterator ri = null;
            try {
              ri = h.generateIterator(eligibleCN);
              if ((ri != null) && (ri.getChange() != null)) {
              if (ri != null && ri.getChange() != null) {
                ChangeNumber newCN = ri.getChange().getChangeNumber();
                result.update(newCN);
              }
            } catch (Exception e) {
              // there's no change older than eligibleCN (case of s3/cn31)
              result.update(new ChangeNumber(0, 0, sid));
              result.update(new ChangeNumber(0, 0, serverId));
            } finally {
              if (ri != null) {
                ri.releaseCursor();
@@ -3117,8 +3136,10 @@
    }
    if (debugEnabled())
      TRACER.debugInfo("In " + this
        + " getEligibleState() result is " + result);
    {
      TRACER
          .debugInfo("In " + this + " getEligibleState() result is " + result);
    }
    return result;
  }
@@ -3154,11 +3175,11 @@
    for (DbHandler db : sourceDbHandlers.values())
    {
      // Consider this producer (DS/db).
      int sid = db.getServerId();
      int serverId = db.getServerId();
      // Should it be considered for eligibility ?
      ChangeNumber heartbeatLastCN =
        getChangeTimeHeartbeatState().getChangeNumber(sid);
        getChangeTimeHeartbeatState().getChangeNumber(serverId);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the domain is considered down and not considered for eligibility
@@ -3174,31 +3195,32 @@
      }
      */
      boolean sidConnected = false;
      if (directoryServers.containsKey(sid))
      boolean serverIdConnected = false;
      if (directoryServers.containsKey(serverId))
      {
        sidConnected = true;
        serverIdConnected = true;
      }
      else
      {
        // not directly connected
        for (ReplicationServerHandler rsh : replicationServers.values())
        {
          if (rsh.isRemoteLDAPServer(sid))
          if (rsh.isRemoteLDAPServer(serverId))
          {
            sidConnected = true;
            serverIdConnected = true;
            break;
          }
        }
      }
      if (!sidConnected)
      if (!serverIdConnected)
      {
        if (debugEnabled())
          TRACER.debugInfo("In " + "Replication Server " +
            replicationServer.getReplicationPort() + " " +
            baseDn + " " + replicationServer.getServerId() +
            " Server " + sid
            + " is not considered for eligibility ... potentially down");
        {
          TRACER.debugInfo("In " + "Replication Server "
              + replicationServer.getReplicationPort() + " " + baseDn + " "
              + replicationServer.getServerId() + " Server " + serverId
              + " is not considered for eligibility ... potentially down");
        }
        continue;
      }
@@ -3216,10 +3238,12 @@
    }
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + "Replication Server " + replicationServer.getReplicationPort() +
        " " + baseDn + " " + replicationServer.getServerId() +
        " getEligibleCN() returns result =" + eligibleCN);
    {
      TRACER.debugInfo("In Replication Server "
          + replicationServer.getReplicationPort() + " " + baseDn + " "
          + replicationServer.getServerId()
          + " getEligibleCN() returns result =" + eligibleCN);
    }
    return eligibleCN;
  }
@@ -3254,8 +3278,7 @@
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : replicationServers
            .values())
        for (ReplicationServerHandler rsHandler : replicationServers.values())
        {
          try
          {
@@ -3292,19 +3315,6 @@
    // TODO:May be we can spare processing by only storing CN (timestamp)
    // instead of a server state.
    getChangeTimeHeartbeatState().update(cn);
    /*
    if (debugEnabled())
    {
      Set<String> ss = ctHeartbeatState.toStringSet();
      String dss = "";
      for (String s : ss)
      {
        dss = dss + " \\ " + s;
      }
      TRACER.debugInfo("In " + this.getName() + " " + dss);
    }
    */
  }
  /**
@@ -3319,20 +3329,22 @@
  {
    long res = 0;
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    for (int sid : dbState) {
      // process one sid
    for (int serverId : getDbServerState())
    {
      ChangeNumber startCN = null;
      if (startState.getChangeNumber(sid) != null)
        startCN = startState.getChangeNumber(sid);
      long sidRes = getCount(sid, startCN, endCN);
      if (startState.getChangeNumber(serverId) != null)
      {
        startCN = startState.getChangeNumber(serverId);
      }
      long serverIdRes = getCount(serverId, startCN, endCN);
      // The startPoint is excluded when counting the ECL eligible changes
      if ((startCN != null) && (sidRes > 0))
        sidRes--;
      if (startCN != null && serverIdRes > 0)
      {
        serverIdRes--;
      }
      res += sidRes;
      res += serverIdRes;
    }
    return res;
  }
@@ -3348,14 +3360,10 @@
  public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN)
  {
    long res = 0;
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    for (int sid : dbState) {
      // process one sid
    for (int serverId : getDbServerState()) {
      ChangeNumber lStartCN =
          new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), sid);
      res += getCount(sid, lStartCN, endCN);
          new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId);
      res += getCount(serverId, lStartCN, endCN);
    }
    return res;
  }
@@ -3370,7 +3378,7 @@
    long latest = 0;
    for (DbHandler db : sourceDbHandlers.values())
    {
      if ((latest==0) || (latest<db.getLatestTrimDate()))
      if (latest == 0 || latest < db.getLatestTrimDate())
      {
        latest = db.getLatestTrimDate();
      }