mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.06.2013 5c3362e4dd1ee79c10160cfd128b70cdb9a2dee1
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -806,9 +806,8 @@
       * out which one is the best to connect to.
       */
      if (debugEnabled())
        TRACER.debugInfo("serverId: " + serverId
          + " phase 1 : will perform PhaseOneH with each RS in "
          + " order to elect the preferred one");
        debugInfo("phase 1 : will perform PhaseOneH with each RS in order to "
            + "elect the preferred one");
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
@@ -818,14 +817,14 @@
      if (replicationServerInfos.size() > 0)
      {
        // At least one server answered, find the best one.
        electedRsInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, getGroupId(), getGenerationID());
        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
            replicationServerInfos, serverId, getGroupId(), getGenerationID());
        electedRsInfo = evals.getBestRS();
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo("serverId: " + serverId
            + " phase 2 : will perform PhaseOneH with the preferred RS="
            + electedRsInfo);
          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
              + electedRsInfo);
        electedRsInfo = performPhaseOneHandshake(
          electedRsInfo.getServerURL(), true, false);
@@ -1056,8 +1055,7 @@
      int nChanges = ServerState.diffChanges(rsState, state);
      if (debugEnabled())
      {
        TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
            + getServerId() + " computed " + nChanges + " changes late.");
        debugInfo("computed " + nChanges + " changes late.");
      }
      /*
@@ -1136,8 +1134,8 @@
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + serverStartMsg + "\nAND RECEIVED:\n" + msg);
        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
            + msg);
      }
      // Wrap received message in a server info object
@@ -1253,8 +1251,7 @@
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + startECLSessionMsg);
        TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
      }
      // Alright set the timeout to the desired value
@@ -1320,8 +1317,8 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
        TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
            + "\nAND RECEIVED:\n" + topologyMsg);
      }
      // Alright set the timeout to the desired value
@@ -1342,13 +1339,203 @@
  }
  /**
   * Class holding evaluation results for electing the best replication server
   * for the local directory server.
   */
  static class RSEvaluations
  {
    private final int localServerId;
    private Map<Integer, ReplicationServerInfo> bestRSs;
    private final Map<Integer, Message> rsEvals =
        new HashMap<Integer, Message>();
    /**
     * Ctor.
     *
     * @param localServerId
     *          the serverId for the local directory server
     * @param rsInfos
     *          a Map of serverId => {@link ReplicationServerInfo} with all the
     *          candidate replication servers
     */
    RSEvaluations(int localServerId,
        Map<Integer, ReplicationServerInfo> rsInfos)
    {
      this.localServerId = localServerId;
      this.bestRSs = rsInfos;
    }
    private boolean keepBest(LocalEvaluation eval)
    {
      if (eval.hasAcceptedAny())
      {
        bestRSs = eval.getAccepted();
        rsEvals.putAll(eval.getRejected());
        return true;
      }
      return false;
    }
    /**
     * Sets the elected best replication server, rejecting all the other
     * replication servers with the supplied evaluation.
     *
     * @param bestRsId
     *          the serverId of the elected replication server
     * @param rejectedRSsEval
     *          the evaluation for all the rejected replication servers
     */
    private void setBestRS(int bestRsId, Message rejectedRSsEval)
    {
      for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
          this.bestRSs.entrySet().iterator(); it.hasNext();)
      {
        final Entry<Integer, ReplicationServerInfo> entry = it.next();
        final Integer rsId = entry.getKey();
        final ReplicationServerInfo rsInfo = entry.getValue();
        if (rsInfo.getServerId() != bestRsId)
        {
          it.remove();
        }
        rsEvals.put(rsId, rejectedRSsEval);
      }
    }
    private void discardAll(Message eval)
    {
      for (Integer rsId : bestRSs.keySet())
      {
        rsEvals.put(rsId, eval);
      }
    }
    private boolean foundBestRS()
    {
      return bestRSs.size() == 1;
    }
    /**
     * Returns the {@link ReplicationServerInfo} for the best replication
     * server.
     *
     * @return the {@link ReplicationServerInfo} for the best replication server
     */
    ReplicationServerInfo getBestRS()
    {
      if (foundBestRS())
      {
        return bestRSs.values().iterator().next();
      }
      return null;
    }
    /**
     * Returns the evaluations for all the candidate replication servers.
     *
     * @return a Map of serverId => Message containing the evaluation for each
     *         candidate replication servers.
     */
    Map<Integer, Message> getEvaluations()
    {
      if (foundBestRS())
      {
        final Integer bestRSServerId = getBestRS().getServerId();
        if (rsEvals.get(bestRSServerId) == null)
        {
          final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
          rsEvals.put(bestRSServerId, eval);
        }
      }
      return Collections.unmodifiableMap(rsEvals);
    }
    /**
     * Returns the evaluation for the supplied replication server Id.
     * <p>
     * Note: "unknown RS" message is returned if the supplied replication server
     * was not part of the candidate replication servers.
     *
     * @param rsServerId
     *          the supplied replication server Id
     * @return the evaluation {@link Message} for the supplied replication
     *         server Id
     */
    private Message getEvaluation(int rsServerId)
    {
      final Message evaluation = getEvaluations().get(rsServerId);
      if (evaluation != null)
      {
        return evaluation;
      }
      return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return new StringBuilder()
          .append("Current best replication server Ids: ").append(
              this.bestRSs.keySet()).append(
              ", Evaluation of connected replication servers").append(
              " (ServerId => Evaluation): ").append(this.rsEvals.keySet())
          .append(", Any replication server not appearing here").append(
              " could not be contacted.").toString();
    }
  }
  /**
   * Evaluation local to one filter.
   */
  private static class LocalEvaluation
  {
    private final Map<Integer, ReplicationServerInfo> filteredRSs =
        new HashMap<Integer, ReplicationServerInfo>();
    private final Map<ReplicationServerInfo, Message> rsEvals =
        new HashMap<ReplicationServerInfo, Message>();
    private void accept(Integer rsId, ReplicationServerInfo rsInfo)
    {
      this.rsEvals.remove(rsInfo); // undo reject
      this.filteredRSs.put(rsId, rsInfo);
    }
    private void reject(ReplicationServerInfo rsInfo, Message reason)
    {
      this.filteredRSs.remove(rsInfo.getServerId()); // undo accept
      this.rsEvals.put(rsInfo, reason);
    }
    private Map<Integer, ReplicationServerInfo> getAccepted()
    {
      return filteredRSs;
    }
    public Map<Integer, Message> getRejected()
    {
      final Map<Integer, Message> result = new HashMap<Integer, Message>();
      for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet())
      {
        result.put(entry.getKey().getServerId(), entry.getValue());
      }
      return result;
    }
    private boolean hasAcceptedAny()
    {
      return !filteredRSs.isEmpty();
    }
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it or determine if we must disconnect from current one to
   * re-connect to best server.
   *
   * <p>
   * Note: this method is static for test purpose (access from unit tests)
   *
   *
   * @param firstConnection True if we run this method for the very first
   * connection of the broker. False if we run this method to determine if the
   * replication server we are currently connected to is still the best or not.
@@ -1365,15 +1552,16 @@
   * disconnect (so the best replication server is another one than the current
   * one). Null can only be returned when firstConnection is false.
   */
  public static ReplicationServerInfo computeBestReplicationServer(
  public static RSEvaluations computeBestReplicationServer(
      boolean firstConnection, int rsServerId, ServerState myState,
      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
      byte groupId, long generationId)
  {
    final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
    // Shortcut, if only one server, this is the best
    if (rsInfos.size() == 1)
    if (evals.foundBestRS())
    {
      return rsInfos.values().iterator().next();
      return evals;
    }
    /**
@@ -1386,8 +1574,6 @@
     *   local DS
     * - replication server in the same VM as local DS one
     */
    // TODO JNR log why an RS was evicted as best server
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    /*
    The list of best replication servers is filtered with each criteria. At
    each criteria, the list is replaced with the filtered one if there
@@ -1398,109 +1584,96 @@
    the local configuration. When the current method is called, for
    sure, at least one server from the list is locally configured
    */
    bestServers =
        keepBest(filterServersLocallyConfigured(bestServers), bestServers);
    filterServersLocallyConfigured(evals, localServerId);
    // Some servers with same group id ?
    bestServers =
        keepBest(filterServersWithSameGroupId(bestServers, groupId),
            bestServers);
    filterServersWithSameGroupId(evals, localServerId, groupId);
    // Some servers with same generation id ?
    Map<Integer, ReplicationServerInfo> sameGenerationId =
        filterServersWithSameGenerationId(bestServers, generationId);
    if (sameGenerationId.size() > 0)
    final boolean rssWithSameGenerationIdExist =
        filterServersWithSameGenerationId(evals, localServerId, generationId);
    if (rssWithSameGenerationIdExist)
    {
      // If some servers with the right generation id this is useful to
      // run the local DS change criteria
      bestServers =
          keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
              myState, localServerId), sameGenerationId);
      filterServersWithAllLocalDSChanges(evals, myState, localServerId);
    }
    // Some servers in the local VM or local host?
    bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
    filterServersOnSameHost(evals, localServerId);
    /**
     * Now apply the choice base on the weight to the best servers list
     */
    if (bestServers.size() == 1)
    if (evals.foundBestRS())
    {
      return bestServers.values().iterator().next();
      return evals;
    }
    /**
     * Now apply the choice based on the weight to the best servers list
     */
    if (firstConnection)
    {
      // We are not connected to a server yet
      return computeBestServerForWeight(bestServers, -1, -1);
      computeBestServerForWeight(evals, -1, -1);
    }
    /*
     * We are already connected to a RS: compute the best RS as far as the
     * weights is concerned. If this is another one, some DS must disconnect.
     */
    return computeBestServerForWeight(bestServers, rsServerId, localServerId);
  }
  /**
   * If the filtered Map is not empty then it is returned, else return the
   * original unfiltered Map.
   *
   * @return the best fit Map between the filtered Map and the original
   * unfiltered Map.
   */
  private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
      Map<K, V> unfilteredMap)
  {
    if (!filteredMap.isEmpty())
    else
    {
      return filteredMap;
      /*
       * We are already connected to a RS: compute the best RS as far as the
       * weights is concerned. If this is another one, some DS must disconnect.
       */
      computeBestServerForWeight(evals, rsServerId, localServerId);
    }
    return unfilteredMap;
    return evals;
  }
  /**
   * Creates a new list that contains only replication servers that are locally
   * configured.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers locally configured
   * @param evals The evaluation object
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersLocallyConfigured(Map<Integer,
    ReplicationServerInfo> bestServers)
  private static void filterServersLocallyConfigured(RSEvaluations evals,
      int localServerId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    final LocalEvaluation eval = new LocalEvaluation();
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      ReplicationServerInfo rsInfo = entry.getValue();
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.isLocallyConfigured())
      {
        result.put(entry.getKey(), rsInfo);
        eval.accept(rsId, rsInfo);
      }
      else
      {
        eval.reject(rsInfo,
            NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
      }
    }
    return result;
    evals.keepBest(eval);
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * passed group id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param evals The evaluation object
   * @param groupId The group id that must match
   * @return The sub list of replication servers matching the requested group id
   * (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGroupId(Map<Integer,
    ReplicationServerInfo> bestServers, byte groupId)
  private static void filterServersWithSameGroupId(RSEvaluations evals,
      int localServerId, byte groupId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    final LocalEvaluation eval = new LocalEvaluation();
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      ReplicationServerInfo rsInfo = entry.getValue();
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.getGroupId() == groupId)
      {
        result.put(entry.getKey(), rsInfo);
        eval.accept(rsId, rsInfo);
      }
      else
      {
        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
            rsId, rsInfo.getGroupId(), localServerId, groupId));
      }
    }
    return result;
    evals.keepBest(eval);
  }
  /**
@@ -1510,28 +1683,37 @@
   * then the 'empty'(generationId==-1) replication servers are also included
   * in the result list.
   *
   * @param bestServers  The list of replication servers to filter
   * @param evals The evaluation object
   * @param generationId The generation id that must match
   * @return The sub list of replication servers matching the requested
   * generation id (which may be empty)
   * @return whether some replication server passed the filter
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGenerationId(Map<Integer,
    ReplicationServerInfo> bestServers, long generationId)
  private static boolean filterServersWithSameGenerationId(
      RSEvaluations evals, long localServerId, long generationId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    final LocalEvaluation eval = new LocalEvaluation();
    boolean emptyState = true;
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo rsInfo = entry.getValue();
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.getGenerationId() == generationId)
      {
        result.put(entry.getKey(), rsInfo);
        eval.accept(rsId, rsInfo);
        if (!rsInfo.serverState.isEmpty())
          emptyState = false;
      }
      else if (rsInfo.getGenerationId() == -1)
      {
        eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
            generationId, localServerId));
      }
      else
      {
        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
            rsId, rsInfo.getGenerationId(), localServerId, generationId));
      }
    }
    if (emptyState)
@@ -1543,38 +1725,27 @@
        ReplicationServerInfo rsInfo = entry.getValue();
        if (rsInfo.getGenerationId() == -1)
        {
          result.put(entry.getKey(), rsInfo);
          // will undo the reject of previously rejected RSs
          eval.accept(entry.getKey(), rsInfo);
        }
      }
    }
    return result;
    return evals.keepBest(eval);
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * latest changes from the passed DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param evals The evaluation object
   * @param localState The state of the local DS
   * @param localServerId The server id to consider for the changes
   * @return The sub list of replication servers that have the latest changes
   * from the passed DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithAllLocalDSChanges(Map<Integer,
    ReplicationServerInfo> bestServers, ServerState localState,
    int localServerId)
  private static void filterServersWithAllLocalDSChanges(
      RSEvaluations evals, ServerState localState, int localServerId)
  {
    Map<Integer, ReplicationServerInfo> upToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    // Extract the CSN of the latest change generated by the local server
    CSN myCSN = localState.getCSN(localServerId);
    if (myCSN == null)
    {
      myCSN = new CSN(0, 0, localServerId);
    }
    final CSN localCSN = getCSN(localState, localServerId);
    /**
     * Find replication servers that are up to date (or more up to date than us,
@@ -1583,60 +1754,96 @@
     * server id. If some servers are more up to date, prefer this list but take
     * only the latest CSN.
     */
    final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
    boolean foundRSMoreUpToDateThanLocalDS = false;
    CSN latestRsCSN = null;
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
      if (rsCSN == null)
      {
        rsCSN = new CSN(0, 0, localServerId);
      }
      final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
      // Has this replication server the latest local change ?
      if (myCSN.isOlderThanOrEqualTo(rsCSN))
      if (rsCSN.isOlderThan(localCSN))
      {
        if (myCSN.equals(rsCSN))
        mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
            rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
      }
      else if (rsCSN.equals(localCSN))
      {
        // This replication server has exactly the latest change from the
        // local server
        if (!foundRSMoreUpToDateThanLocalDS)
        {
          // This replication server has exactly the latest change from the
          // local server
          upToDateServers.put(rsId, rsInfo);
        } else
          mostUpToDateEval.accept(rsId, rsInfo);
        }
        else
        {
          // This replication server is even more up to date than the local
          // server
          if (latestRsCSN == null)
          {
            // Initialize the latest CSN
            latestRsCSN = rsCSN;
          }
          if (rsCSN.isNewerThanOrEqualTo(latestRsCSN))
          {
            if (rsCSN.equals(latestRsCSN))
            {
              moreUpToDateServers.put(rsId, rsInfo);
            } else
            {
              // This RS is even more up to date, clear the list and store this
              // new RS
              moreUpToDateServers.clear();
              moreUpToDateServers.put(rsId, rsInfo);
              latestRsCSN = rsCSN;
            }
          }
          mostUpToDateEval.reject(rsInfo,
            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
        }
      }
      else if (rsCSN.isNewerThan(localCSN))
      {
        // This replication server is even more up to date than the local server
        if (latestRsCSN == null)
        {
          foundRSMoreUpToDateThanLocalDS = true;
          // all previous results are now outdated, reject them all
          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
              localCSN);
          // Initialize the latest CSN
          latestRsCSN = rsCSN;
        }
        if (rsCSN.equals(latestRsCSN))
        {
          mostUpToDateEval.accept(rsId, rsInfo);
        }
        else if (rsCSN.isNewerThan(latestRsCSN))
        {
          // This RS is even more up to date, reject all previously accepted RSs
          // and store this new RS
          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
              localCSN);
          mostUpToDateEval.accept(rsId, rsInfo);
          latestRsCSN = rsCSN;
        }
        else
        {
          mostUpToDateEval.reject(rsInfo,
            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
        }
      }
    }
    if (moreUpToDateServers.size() > 0)
    {
      // Prefer servers more up to date than local server
      return moreUpToDateServers;
    }
    return upToDateServers;
    evals.keepBest(mostUpToDateEval);
  }
  private static CSN getCSN(ServerState localState, int localServerId)
  {
    final CSN csn = localState.getCSN(localServerId);
    if (csn != null)
    {
      return csn;
    }
    return new CSN(0, 0, localServerId);
  }
  private static void rejectAllWithRSIsLaterThanBestRS(
      final LocalEvaluation eval, int localServerId, CSN localCSN)
  {
    for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
    {
      final String rsCSN =
          getCSN(rsInfo.getServerState(), localServerId).toStringUI();
      final Message reason =
          NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
            rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
      eval.reject(rsInfo, reason);
    }
  }
  /**
   * Creates a new list that contains only replication servers that are on the
@@ -1644,22 +1851,18 @@
   * method will gives priority to any replication server which is in the same
   * VM as this DS.
   *
   * @param bestServers
   *          The list of replication servers to filter
   * @return The sub list of replication servers being on the same host as the
   *         local DS (which may be empty)
   * @param evals The evaluation object
   */
  private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
      Map<Integer, ReplicationServerInfo> bestServers)
  private static void filterServersOnSameHost(RSEvaluations evals,
      int localServerId)
  {
    /*
     * Initially look for all servers on the same host. If we find one in the
     * same VM, then narrow the search.
     */
    boolean foundRSInSameVM = false;
    final Map<Integer, ReplicationServerInfo> result =
        new HashMap<Integer, ReplicationServerInfo>();
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    final LocalEvaluation eval = new LocalEvaluation();
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
@@ -1672,24 +1875,41 @@
          {
            // An RS in the same VM will always have priority.
            // Narrow the search to only include servers in this VM.
            result.clear();
            rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
            foundRSInSameVM = true;
          }
          result.put(rsId, rsInfo);
          eval.accept(rsId, rsInfo);
        }
        else if (!foundRSInSameVM)
        {
          // OK, accept RSs on the same machine because we have not found an RS
          // in the same VM yet
          result.put(rsId, rsInfo);
          eval.accept(rsId, rsInfo);
        }
        else
        {
          // Skip: we have found some RSs in the same VM, but this RS is not.
          eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
              localServerId));
        }
      }
      else
      {
        eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
            localServerId));
      }
    }
    return result;
    evals.keepBest(eval);
  }
  private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
      int localServerId)
  {
    for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
    {
      eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
          rsInfo.getServerId(), localServerId));
    }
  }
  /**
@@ -1699,23 +1919,18 @@
   * Warning: This method is expected to be called with at least 2 servers in
   * bestServers
   * Note: this method is static for test purpose (access from unit tests)
   * @param bestServers The list of replication servers to consider
   * @param evals The evaluation object
   * @param currentRsServerId The replication server the local server is
   *        currently connected to. -1 if the local server is not yet connected
   *        to any replication server.
   * @param localServerId The server id of the local server. This is not used
   *        when it is not connected to a replication server
   *        (currentRsServerId = -1)
   * @return The replication server the local server should be connected to
   * as far as the weight is concerned. This may be the currently used one if
   * the weight is correctly spread. If the returned value is null, the best
   * replication server is undetermined but the local server must disconnect
   * (so the best replication server is another one than the current one).
   */
  public static ReplicationServerInfo computeBestServerForWeight(
    Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
    int localServerId)
  public static void computeBestServerForWeight(RSEvaluations evals,
      int currentRsServerId, int localServerId)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    /*
     * - Compute the load goal of each RS, deducing it from the weights affected
     * to them.
@@ -1765,18 +1980,21 @@
    {
      // The local server is not connected yet, find best server to connect to,
      // taking the weights into account.
      return computeBestServerWhenNotConnected(bestServers, loadDistances);
      computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
    }
    // The local server is currently connected to a RS, let's see if it must
    // disconnect or not, taking the weights into account.
    return computeBestServerWhenConnected(bestServers, loadDistances,
        localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
    else
    {
      // The local server is currently connected to a RS, let's see if it must
      // disconnect or not, taking the weights into account.
      computeBestServerWhenConnected(evals, loadDistances, localServerId,
          currentRsServerId, sumOfWeights, sumOfConnectedDSs);
    }
  }
  private static ReplicationServerInfo computeBestServerWhenNotConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
      Map<Integer, BigDecimal> loadDistances)
  private static void computeBestServerWhenNotConnected(RSEvaluations evals,
      Map<Integer, BigDecimal> loadDistances, int localServerId)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    /*
     * Find the server with the current highest distance to its load goal and
     * choose it. Make an exception if every server is correctly balanced,
@@ -1815,14 +2033,15 @@
      // Choose server with the highest weight
      bestRsId = highestWeightRsId;
    }
    return bestServers.get(bestRsId);
    evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
        bestRsId));
  }
  private static ReplicationServerInfo computeBestServerWhenConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
  private static void computeBestServerWhenConnected(RSEvaluations evals,
      Map<Integer, BigDecimal> loadDistances, int localServerId,
      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    float currentLoadDistance =
      loadDistances.get(currentRsServerId).floatValue();
@@ -1928,45 +2147,67 @@
          {
            // Avoid the yoyo effect, and keep the local DS connected to its
            // current RS
            return bestServers.get(currentRsServerId);
            evals.setBestRS(currentRsServerId,
                NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
            return;
          }
        }
        // Prepare a sorted list (from lowest to highest) or DS server ids
        // connected to the current RS
        ReplicationServerInfo currentRsInfo =
          bestServers.get(currentRsServerId);
        List<Integer> serversConnectedToCurrentRS =
            new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
        Collections.sort(serversConnectedToCurrentRS);
        // Go through the list of DSs to disconnect and see if the local
        // server is part of them.
        int index = 0;
        while (overloadingDSsNumber > 0)
            bestServers.get(currentRsServerId);
        if (isServerOverloadingRS(localServerId, currentRsInfo,
            overloadingDSsNumber))
        {
          int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
          if (serverIdToDisconnect == localServerId)
          {
            // The local server is part of the DSs to disconnect
            return null;
          }
          overloadingDSsNumber--;
          index++;
          // The local server is part of the DSs to disconnect
          evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
              localServerId, currentRsServerId));
        }
        // The local server is not part of the servers to disconnect from the
        // current RS.
        else
        {
          // The local server is not part of the servers to disconnect from the
          // current RS.
          evals.setBestRS(currentRsServerId,
              NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
                  currentRsServerId));
        }
      } else {
        // The average distance of the other RSs does not show a lack of DSs:
        // no need to disconnect any DS from the current RS.
        evals.setBestRS(currentRsServerId,
            NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
                currentRsServerId));
      }
    } else {
      // The RS load goal is reached or there are not enough DSs connected to
      // it to reach it: do not disconnect from this RS and return rsInfo for
      // this RS
      evals.setBestRS(currentRsServerId,
          NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
              currentRsServerId));
    }
    return bestServers.get(currentRsServerId);
  }
  /**
   * Returns whether the local DS is overloading the RS.
   * <p>
   * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
   * DSs connected to this RS is ordered by serverId to use a consistent
   * ordering across all nodes in the topology. The serverIds which index in the
   * List are lower than "overloadingDSsNumber" will be evicted first.
   * <p>
   * This ordering is unfair since nodes with the lower serverIds will be
   * evicted more often than nodes with higher serverIds. However, it is a
   * consistent and reliable ordering applicable anywhere in the topology.
   */
  private static boolean isServerOverloadingRS(int localServerId,
      ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
  {
    List<Integer> serversConnectedToCurrentRS =
        new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
    Collections.sort(serversConnectedToCurrentRS);
    final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
    return idx != -1 && idx < overloadingDSsNumber;
  }
  /**
@@ -2069,8 +2310,8 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(this + " end restart : connected=" + connected
        + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
      debugInfo("end restart : connected=" + connected + " with RS("
          + getRsServerId() + ") genId=" + this.generationID);
    }
  }
@@ -2129,8 +2370,8 @@
        if (debugEnabled())
        {
          TRACER.debugInfo("ReplicationBroker.publish() Publishing a "
            + "message is not possible due to existing connection error.");
          debugInfo("publish(): Publishing a message is not possible due to"
              + " existing connection error.");
        }
        return false;
@@ -2232,8 +2473,8 @@
            // ignore
            if (debugEnabled())
            {
              TRACER.debugInfo("ReplicationBroker.publish() "
                + "Interrupted exception raised : " + e.getLocalizedMessage());
              debugInfo("publish(): Interrupted exception raised : "
                  + e.getLocalizedMessage());
            }
          }
        }
@@ -2242,8 +2483,8 @@
        // just loop.
        if (debugEnabled())
        {
          TRACER.debugInfo("ReplicationBroker.publish() "
            + "Interrupted exception raised." + e.getLocalizedMessage());
          debugInfo("publish(): Interrupted exception raised."
              + e.getLocalizedMessage());
        }
      }
    }
@@ -2393,10 +2634,10 @@
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              final ReplicationServerInfo bestServerInfo =
              final RSEvaluations evals =
                  computeBestReplicationServer(false, previousRsServerID, state,
                      replicationServerInfos, serverId, getGroupId(),
                      generationID);
                  replicationServerInfos, serverId, getGroupId(), generationID);
              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
              if (previousRsServerID != -1
                  && (bestServerInfo == null
                      || bestServerInfo.getServerId() != previousRsServerID))
@@ -2413,14 +2654,19 @@
                }
                else
                {
                  // TODO JNR log why an RS was evicted as best server
                  final int bestRsServerId = bestServerInfo.getServerId();
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      localSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
                      bestRsServerId,
                      baseDN.toNormalizedString(),
                      evals.getEvaluation(previousRsServerID).toString(),
                      evals.getEvaluation(bestRsServerId).toString());
                }
                logError(message);
                if (debugEnabled())
                  debugInfo("best replication servers evaluation results: "
                      + evals);
                reStart(true);
              }
@@ -2472,14 +2718,12 @@
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication Server to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
   * currently connected and
   * Gets the States of all the Replicas currently in the Topology. When this
   * method is called, a Monitoring message will be sent to the Replication
   * Server to which this domain is currently connected so that it computes a
   * table containing information about all Directory Servers in the topology.
   * This Computation involves communications will all the servers currently
   * connected and
   *
   * @return The States of all Replicas in the topology (except us)
   */
@@ -2539,9 +2783,8 @@
  public void stop()
  {
    if (debugEnabled())
      TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
          + " and will close the connection to replication server "
          + rsServerId + " for domain " + getBaseDN());
      debugInfo("is stopping and will close the connection to"
          + " replication server " + rsServerId);
    synchronized (startStopLock)
    {
@@ -2786,7 +3029,7 @@
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
      debugInfo("receive TopologyMsg=" + topoMsg);
    // Store new DS list
    dsList = topoMsg.getDsList();
@@ -2876,8 +3119,7 @@
    else
    {
      if (debugEnabled())
        TRACER.debugInfo(this
          + " is not configured to send CSN heartbeat interval");
        debugInfo("is not configured to send CSN heartbeat interval");
    }
  }
@@ -3003,4 +3245,10 @@
    }
    return sb.toString();
  }
  private void debugInfo(String message)
  {
    TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
        + " and serverId=" + getServerId() + " " + message);
  }
}