mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.06.2013 5c3362e4dd1ee79c10160cfd128b70cdb9a2dee1
opends/src/messages/messages/replication.properties
@@ -406,7 +406,8 @@
 server will now try to connect to another replication server
NOTICE_NEW_BEST_REPLICATION_SERVER_188=Directory Server DS(%d) is switching \
 from replication server RS(%d) at %s to RS(%d) for domain "%s" because it is \
 more suitable
 more suitable. The previous replication server evaluation was: "%s", and the \
 new replication server evaluation was: "%s"
NOTICE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START_141=Starting total update: \
 importing domain "%s" from remote directory server DS(%d) to this directory \
 server DS(%d)
@@ -495,4 +496,34 @@
 after having successfully read the oldest. Database might have been cleaned or \
 closed between successive reads
SEVERE_WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED_218=Cannot \
 open database %s because shutdown was requested from replication server RS(%d)
 open database %s because shutdown was requested from replication server RS(%d)
NOTICE_RS_NOT_LOCALLY_CONFIGURED_219=RS(%d) was not configured locally on DS(%d), \
 but at least one other RS was
NOTICE_RS_HAS_NO_GENERATION_ID_220=RS(%d) has no generation Id, but at least one \
 other RS has the same generation Id %d as DS(%d)
NOTICE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS_221=RS(%d) generation Id %d does not \
 match DS(%d) generation Id %d, but at least another RS does
NOTICE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS_222=RS(%d) groupId %d does not match \
 DS(%d) groupId %d, but at least another RS does
NOTICE_RS_LATER_THAN_LOCAL_DS_223=RS(%d) newest change %s is behind DS(%d) \
 newest change %s, but at least another RS is at the same point or ahead of the DS
NOTICE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS_224=RS(%d) newest \
 change %s is behind another RS which is ahead of DS(%d) newest change %s
NOTICE_RS_ON_DIFFERENT_VM_THAN_DS_225=RS(%d) is on the same host, but a different \
 virtual machine than DS(%d), but at least another RS is
NOTICE_RS_ON_DIFFERENT_HOST_THAN_DS_226=RS(%d) is on a different host than DS(%d), \
 but at least another RS is on the same host
NOTICE_DISCONNECT_DS_FROM_OVERLOADED_RS_227=DS(%d) disconnected from overloaded RS(%d)
NOTICE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS_228=DS(%d) not disconnected from \
 overloaded RS(%d), other DSs will disconnect
NOTICE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS_229=DS(%d) not disconnected from \
 current RS(%d), since there is no need to rebalance all directory servers to \
 other replication servers in the topology.
NOTICE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS_230=DS(%d) not disconnected \
 from current RS(%d), because RS is underloaded or its load goal is reached
NOTICE_BIGGEST_WEIGHT_RS_231=DS(%d) will connect to RS(%d) because it has the \
 biggest weight among all the replication servers
NOTICE_AVOID_YOYO_EFFECT_232=DS(%d) stayed connected to RS(%d) to avoid the yoyo effect
NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
 for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -806,9 +806,8 @@
       * out which one is the best to connect to.
       */
      if (debugEnabled())
        TRACER.debugInfo("serverId: " + serverId
          + " phase 1 : will perform PhaseOneH with each RS in "
          + " order to elect the preferred one");
        debugInfo("phase 1 : will perform PhaseOneH with each RS in order to "
            + "elect the preferred one");
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
@@ -818,14 +817,14 @@
      if (replicationServerInfos.size() > 0)
      {
        // At least one server answered, find the best one.
        electedRsInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, getGroupId(), getGenerationID());
        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
            replicationServerInfos, serverId, getGroupId(), getGenerationID());
        electedRsInfo = evals.getBestRS();
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo("serverId: " + serverId
            + " phase 2 : will perform PhaseOneH with the preferred RS="
            + electedRsInfo);
          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
              + electedRsInfo);
        electedRsInfo = performPhaseOneHandshake(
          electedRsInfo.getServerURL(), true, false);
@@ -1056,8 +1055,7 @@
      int nChanges = ServerState.diffChanges(rsState, state);
      if (debugEnabled())
      {
        TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
            + getServerId() + " computed " + nChanges + " changes late.");
        debugInfo("computed " + nChanges + " changes late.");
      }
      /*
@@ -1136,8 +1134,8 @@
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + serverStartMsg + "\nAND RECEIVED:\n" + msg);
        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
            + msg);
      }
      // Wrap received message in a server info object
@@ -1253,8 +1251,7 @@
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + startECLSessionMsg);
        TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
      }
      // Alright set the timeout to the desired value
@@ -1320,8 +1317,8 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
        TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
            + "\nAND RECEIVED:\n" + topologyMsg);
      }
      // Alright set the timeout to the desired value
@@ -1342,13 +1339,203 @@
  }
  /**
   * Class holding evaluation results for electing the best replication server
   * for the local directory server.
   */
  static class RSEvaluations
  {
    private final int localServerId;
    private Map<Integer, ReplicationServerInfo> bestRSs;
    private final Map<Integer, Message> rsEvals =
        new HashMap<Integer, Message>();
    /**
     * Ctor.
     *
     * @param localServerId
     *          the serverId for the local directory server
     * @param rsInfos
     *          a Map of serverId => {@link ReplicationServerInfo} with all the
     *          candidate replication servers
     */
    RSEvaluations(int localServerId,
        Map<Integer, ReplicationServerInfo> rsInfos)
    {
      this.localServerId = localServerId;
      this.bestRSs = rsInfos;
    }
    private boolean keepBest(LocalEvaluation eval)
    {
      if (eval.hasAcceptedAny())
      {
        bestRSs = eval.getAccepted();
        rsEvals.putAll(eval.getRejected());
        return true;
      }
      return false;
    }
    /**
     * Sets the elected best replication server, rejecting all the other
     * replication servers with the supplied evaluation.
     *
     * @param bestRsId
     *          the serverId of the elected replication server
     * @param rejectedRSsEval
     *          the evaluation for all the rejected replication servers
     */
    private void setBestRS(int bestRsId, Message rejectedRSsEval)
    {
      for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
          this.bestRSs.entrySet().iterator(); it.hasNext();)
      {
        final Entry<Integer, ReplicationServerInfo> entry = it.next();
        final Integer rsId = entry.getKey();
        final ReplicationServerInfo rsInfo = entry.getValue();
        if (rsInfo.getServerId() != bestRsId)
        {
          it.remove();
        }
        rsEvals.put(rsId, rejectedRSsEval);
      }
    }
    private void discardAll(Message eval)
    {
      for (Integer rsId : bestRSs.keySet())
      {
        rsEvals.put(rsId, eval);
      }
    }
    private boolean foundBestRS()
    {
      return bestRSs.size() == 1;
    }
    /**
     * Returns the {@link ReplicationServerInfo} for the best replication
     * server.
     *
     * @return the {@link ReplicationServerInfo} for the best replication server
     */
    ReplicationServerInfo getBestRS()
    {
      if (foundBestRS())
      {
        return bestRSs.values().iterator().next();
      }
      return null;
    }
    /**
     * Returns the evaluations for all the candidate replication servers.
     *
     * @return a Map of serverId => Message containing the evaluation for each
     *         candidate replication servers.
     */
    Map<Integer, Message> getEvaluations()
    {
      if (foundBestRS())
      {
        final Integer bestRSServerId = getBestRS().getServerId();
        if (rsEvals.get(bestRSServerId) == null)
        {
          final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
          rsEvals.put(bestRSServerId, eval);
        }
      }
      return Collections.unmodifiableMap(rsEvals);
    }
    /**
     * Returns the evaluation for the supplied replication server Id.
     * <p>
     * Note: "unknown RS" message is returned if the supplied replication server
     * was not part of the candidate replication servers.
     *
     * @param rsServerId
     *          the supplied replication server Id
     * @return the evaluation {@link Message} for the supplied replication
     *         server Id
     */
    private Message getEvaluation(int rsServerId)
    {
      final Message evaluation = getEvaluations().get(rsServerId);
      if (evaluation != null)
      {
        return evaluation;
      }
      return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return new StringBuilder()
          .append("Current best replication server Ids: ").append(
              this.bestRSs.keySet()).append(
              ", Evaluation of connected replication servers").append(
              " (ServerId => Evaluation): ").append(this.rsEvals.keySet())
          .append(", Any replication server not appearing here").append(
              " could not be contacted.").toString();
    }
  }
  /**
   * Evaluation local to one filter.
   */
  private static class LocalEvaluation
  {
    private final Map<Integer, ReplicationServerInfo> filteredRSs =
        new HashMap<Integer, ReplicationServerInfo>();
    private final Map<ReplicationServerInfo, Message> rsEvals =
        new HashMap<ReplicationServerInfo, Message>();
    private void accept(Integer rsId, ReplicationServerInfo rsInfo)
    {
      this.rsEvals.remove(rsInfo); // undo reject
      this.filteredRSs.put(rsId, rsInfo);
    }
    private void reject(ReplicationServerInfo rsInfo, Message reason)
    {
      this.filteredRSs.remove(rsInfo.getServerId()); // undo accept
      this.rsEvals.put(rsInfo, reason);
    }
    private Map<Integer, ReplicationServerInfo> getAccepted()
    {
      return filteredRSs;
    }
    public Map<Integer, Message> getRejected()
    {
      final Map<Integer, Message> result = new HashMap<Integer, Message>();
      for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet())
      {
        result.put(entry.getKey().getServerId(), entry.getValue());
      }
      return result;
    }
    private boolean hasAcceptedAny()
    {
      return !filteredRSs.isEmpty();
    }
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it or determine if we must disconnect from current one to
   * re-connect to best server.
   *
   * <p>
   * Note: this method is static for test purpose (access from unit tests)
   *
   *
   * @param firstConnection True if we run this method for the very first
   * connection of the broker. False if we run this method to determine if the
   * replication server we are currently connected to is still the best or not.
@@ -1365,15 +1552,16 @@
   * disconnect (so the best replication server is another one than the current
   * one). Null can only be returned when firstConnection is false.
   */
  public static ReplicationServerInfo computeBestReplicationServer(
  public static RSEvaluations computeBestReplicationServer(
      boolean firstConnection, int rsServerId, ServerState myState,
      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
      byte groupId, long generationId)
  {
    final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
    // Shortcut, if only one server, this is the best
    if (rsInfos.size() == 1)
    if (evals.foundBestRS())
    {
      return rsInfos.values().iterator().next();
      return evals;
    }
    /**
@@ -1386,8 +1574,6 @@
     *   local DS
     * - replication server in the same VM as local DS one
     */
    // TODO JNR log why an RS was evicted as best server
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    /*
    The list of best replication servers is filtered with each criteria. At
    each criteria, the list is replaced with the filtered one if there
@@ -1398,109 +1584,96 @@
    the local configuration. When the current method is called, for
    sure, at least one server from the list is locally configured
    */
    bestServers =
        keepBest(filterServersLocallyConfigured(bestServers), bestServers);
    filterServersLocallyConfigured(evals, localServerId);
    // Some servers with same group id ?
    bestServers =
        keepBest(filterServersWithSameGroupId(bestServers, groupId),
            bestServers);
    filterServersWithSameGroupId(evals, localServerId, groupId);
    // Some servers with same generation id ?
    Map<Integer, ReplicationServerInfo> sameGenerationId =
        filterServersWithSameGenerationId(bestServers, generationId);
    if (sameGenerationId.size() > 0)
    final boolean rssWithSameGenerationIdExist =
        filterServersWithSameGenerationId(evals, localServerId, generationId);
    if (rssWithSameGenerationIdExist)
    {
      // If some servers with the right generation id this is useful to
      // run the local DS change criteria
      bestServers =
          keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
              myState, localServerId), sameGenerationId);
      filterServersWithAllLocalDSChanges(evals, myState, localServerId);
    }
    // Some servers in the local VM or local host?
    bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
    filterServersOnSameHost(evals, localServerId);
    /**
     * Now apply the choice base on the weight to the best servers list
     */
    if (bestServers.size() == 1)
    if (evals.foundBestRS())
    {
      return bestServers.values().iterator().next();
      return evals;
    }
    /**
     * Now apply the choice based on the weight to the best servers list
     */
    if (firstConnection)
    {
      // We are not connected to a server yet
      return computeBestServerForWeight(bestServers, -1, -1);
      computeBestServerForWeight(evals, -1, -1);
    }
    /*
     * We are already connected to a RS: compute the best RS as far as the
     * weights is concerned. If this is another one, some DS must disconnect.
     */
    return computeBestServerForWeight(bestServers, rsServerId, localServerId);
  }
  /**
   * If the filtered Map is not empty then it is returned, else return the
   * original unfiltered Map.
   *
   * @return the best fit Map between the filtered Map and the original
   * unfiltered Map.
   */
  private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
      Map<K, V> unfilteredMap)
  {
    if (!filteredMap.isEmpty())
    else
    {
      return filteredMap;
      /*
       * We are already connected to a RS: compute the best RS as far as the
       * weights is concerned. If this is another one, some DS must disconnect.
       */
      computeBestServerForWeight(evals, rsServerId, localServerId);
    }
    return unfilteredMap;
    return evals;
  }
  /**
   * Creates a new list that contains only replication servers that are locally
   * configured.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers locally configured
   * @param evals The evaluation object
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersLocallyConfigured(Map<Integer,
    ReplicationServerInfo> bestServers)
  private static void filterServersLocallyConfigured(RSEvaluations evals,
      int localServerId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    final LocalEvaluation eval = new LocalEvaluation();
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      ReplicationServerInfo rsInfo = entry.getValue();
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.isLocallyConfigured())
      {
        result.put(entry.getKey(), rsInfo);
        eval.accept(rsId, rsInfo);
      }
      else
      {
        eval.reject(rsInfo,
            NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
      }
    }
    return result;
    evals.keepBest(eval);
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * passed group id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param evals The evaluation object
   * @param groupId The group id that must match
   * @return The sub list of replication servers matching the requested group id
   * (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGroupId(Map<Integer,
    ReplicationServerInfo> bestServers, byte groupId)
  private static void filterServersWithSameGroupId(RSEvaluations evals,
      int localServerId, byte groupId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    final LocalEvaluation eval = new LocalEvaluation();
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      ReplicationServerInfo rsInfo = entry.getValue();
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.getGroupId() == groupId)
      {
        result.put(entry.getKey(), rsInfo);
        eval.accept(rsId, rsInfo);
      }
      else
      {
        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
            rsId, rsInfo.getGroupId(), localServerId, groupId));
      }
    }
    return result;
    evals.keepBest(eval);
  }
  /**
@@ -1510,28 +1683,37 @@
   * then the 'empty'(generationId==-1) replication servers are also included
   * in the result list.
   *
   * @param bestServers  The list of replication servers to filter
   * @param evals The evaluation object
   * @param generationId The generation id that must match
   * @return The sub list of replication servers matching the requested
   * generation id (which may be empty)
   * @return whether some replication server passed the filter
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGenerationId(Map<Integer,
    ReplicationServerInfo> bestServers, long generationId)
  private static boolean filterServersWithSameGenerationId(
      RSEvaluations evals, long localServerId, long generationId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    final LocalEvaluation eval = new LocalEvaluation();
    boolean emptyState = true;
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo rsInfo = entry.getValue();
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.getGenerationId() == generationId)
      {
        result.put(entry.getKey(), rsInfo);
        eval.accept(rsId, rsInfo);
        if (!rsInfo.serverState.isEmpty())
          emptyState = false;
      }
      else if (rsInfo.getGenerationId() == -1)
      {
        eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
            generationId, localServerId));
      }
      else
      {
        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
            rsId, rsInfo.getGenerationId(), localServerId, generationId));
      }
    }
    if (emptyState)
@@ -1543,38 +1725,27 @@
        ReplicationServerInfo rsInfo = entry.getValue();
        if (rsInfo.getGenerationId() == -1)
        {
          result.put(entry.getKey(), rsInfo);
          // will undo the reject of previously rejected RSs
          eval.accept(entry.getKey(), rsInfo);
        }
      }
    }
    return result;
    return evals.keepBest(eval);
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * latest changes from the passed DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param evals The evaluation object
   * @param localState The state of the local DS
   * @param localServerId The server id to consider for the changes
   * @return The sub list of replication servers that have the latest changes
   * from the passed DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithAllLocalDSChanges(Map<Integer,
    ReplicationServerInfo> bestServers, ServerState localState,
    int localServerId)
  private static void filterServersWithAllLocalDSChanges(
      RSEvaluations evals, ServerState localState, int localServerId)
  {
    Map<Integer, ReplicationServerInfo> upToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    // Extract the CSN of the latest change generated by the local server
    CSN myCSN = localState.getCSN(localServerId);
    if (myCSN == null)
    {
      myCSN = new CSN(0, 0, localServerId);
    }
    final CSN localCSN = getCSN(localState, localServerId);
    /**
     * Find replication servers that are up to date (or more up to date than us,
@@ -1583,60 +1754,96 @@
     * server id. If some servers are more up to date, prefer this list but take
     * only the latest CSN.
     */
    final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
    boolean foundRSMoreUpToDateThanLocalDS = false;
    CSN latestRsCSN = null;
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
      if (rsCSN == null)
      {
        rsCSN = new CSN(0, 0, localServerId);
      }
      final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
      // Has this replication server the latest local change ?
      if (myCSN.isOlderThanOrEqualTo(rsCSN))
      if (rsCSN.isOlderThan(localCSN))
      {
        if (myCSN.equals(rsCSN))
        mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
            rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
      }
      else if (rsCSN.equals(localCSN))
      {
        // This replication server has exactly the latest change from the
        // local server
        if (!foundRSMoreUpToDateThanLocalDS)
        {
          // This replication server has exactly the latest change from the
          // local server
          upToDateServers.put(rsId, rsInfo);
        } else
          mostUpToDateEval.accept(rsId, rsInfo);
        }
        else
        {
          // This replication server is even more up to date than the local
          // server
          if (latestRsCSN == null)
          {
            // Initialize the latest CSN
            latestRsCSN = rsCSN;
          }
          if (rsCSN.isNewerThanOrEqualTo(latestRsCSN))
          {
            if (rsCSN.equals(latestRsCSN))
            {
              moreUpToDateServers.put(rsId, rsInfo);
            } else
            {
              // This RS is even more up to date, clear the list and store this
              // new RS
              moreUpToDateServers.clear();
              moreUpToDateServers.put(rsId, rsInfo);
              latestRsCSN = rsCSN;
            }
          }
          mostUpToDateEval.reject(rsInfo,
            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
        }
      }
      else if (rsCSN.isNewerThan(localCSN))
      {
        // This replication server is even more up to date than the local server
        if (latestRsCSN == null)
        {
          foundRSMoreUpToDateThanLocalDS = true;
          // all previous results are now outdated, reject them all
          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
              localCSN);
          // Initialize the latest CSN
          latestRsCSN = rsCSN;
        }
        if (rsCSN.equals(latestRsCSN))
        {
          mostUpToDateEval.accept(rsId, rsInfo);
        }
        else if (rsCSN.isNewerThan(latestRsCSN))
        {
          // This RS is even more up to date, reject all previously accepted RSs
          // and store this new RS
          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
              localCSN);
          mostUpToDateEval.accept(rsId, rsInfo);
          latestRsCSN = rsCSN;
        }
        else
        {
          mostUpToDateEval.reject(rsInfo,
            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
        }
      }
    }
    if (moreUpToDateServers.size() > 0)
    {
      // Prefer servers more up to date than local server
      return moreUpToDateServers;
    }
    return upToDateServers;
    evals.keepBest(mostUpToDateEval);
  }
  private static CSN getCSN(ServerState localState, int localServerId)
  {
    final CSN csn = localState.getCSN(localServerId);
    if (csn != null)
    {
      return csn;
    }
    return new CSN(0, 0, localServerId);
  }
  private static void rejectAllWithRSIsLaterThanBestRS(
      final LocalEvaluation eval, int localServerId, CSN localCSN)
  {
    for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
    {
      final String rsCSN =
          getCSN(rsInfo.getServerState(), localServerId).toStringUI();
      final Message reason =
          NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
            rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
      eval.reject(rsInfo, reason);
    }
  }
  /**
   * Creates a new list that contains only replication servers that are on the
@@ -1644,22 +1851,18 @@
   * method will gives priority to any replication server which is in the same
   * VM as this DS.
   *
   * @param bestServers
   *          The list of replication servers to filter
   * @return The sub list of replication servers being on the same host as the
   *         local DS (which may be empty)
   * @param evals The evaluation object
   */
  private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
      Map<Integer, ReplicationServerInfo> bestServers)
  private static void filterServersOnSameHost(RSEvaluations evals,
      int localServerId)
  {
    /*
     * Initially look for all servers on the same host. If we find one in the
     * same VM, then narrow the search.
     */
    boolean foundRSInSameVM = false;
    final Map<Integer, ReplicationServerInfo> result =
        new HashMap<Integer, ReplicationServerInfo>();
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    final LocalEvaluation eval = new LocalEvaluation();
    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
    {
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
@@ -1672,24 +1875,41 @@
          {
            // An RS in the same VM will always have priority.
            // Narrow the search to only include servers in this VM.
            result.clear();
            rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
            foundRSInSameVM = true;
          }
          result.put(rsId, rsInfo);
          eval.accept(rsId, rsInfo);
        }
        else if (!foundRSInSameVM)
        {
          // OK, accept RSs on the same machine because we have not found an RS
          // in the same VM yet
          result.put(rsId, rsInfo);
          eval.accept(rsId, rsInfo);
        }
        else
        {
          // Skip: we have found some RSs in the same VM, but this RS is not.
          eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
              localServerId));
        }
      }
      else
      {
        eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
            localServerId));
      }
    }
    return result;
    evals.keepBest(eval);
  }
  private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
      int localServerId)
  {
    for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
    {
      eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
          rsInfo.getServerId(), localServerId));
    }
  }
  /**
@@ -1699,23 +1919,18 @@
   * Warning: This method is expected to be called with at least 2 servers in
   * bestServers
   * Note: this method is static for test purpose (access from unit tests)
   * @param bestServers The list of replication servers to consider
   * @param evals The evaluation object
   * @param currentRsServerId The replication server the local server is
   *        currently connected to. -1 if the local server is not yet connected
   *        to any replication server.
   * @param localServerId The server id of the local server. This is not used
   *        when it is not connected to a replication server
   *        (currentRsServerId = -1)
   * @return The replication server the local server should be connected to
   * as far as the weight is concerned. This may be the currently used one if
   * the weight is correctly spread. If the returned value is null, the best
   * replication server is undetermined but the local server must disconnect
   * (so the best replication server is another one than the current one).
   */
  public static ReplicationServerInfo computeBestServerForWeight(
    Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
    int localServerId)
  public static void computeBestServerForWeight(RSEvaluations evals,
      int currentRsServerId, int localServerId)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    /*
     * - Compute the load goal of each RS, deducing it from the weights affected
     * to them.
@@ -1765,18 +1980,21 @@
    {
      // The local server is not connected yet, find best server to connect to,
      // taking the weights into account.
      return computeBestServerWhenNotConnected(bestServers, loadDistances);
      computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
    }
    // The local server is currently connected to a RS, let's see if it must
    // disconnect or not, taking the weights into account.
    return computeBestServerWhenConnected(bestServers, loadDistances,
        localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
    else
    {
      // The local server is currently connected to a RS, let's see if it must
      // disconnect or not, taking the weights into account.
      computeBestServerWhenConnected(evals, loadDistances, localServerId,
          currentRsServerId, sumOfWeights, sumOfConnectedDSs);
    }
  }
  private static ReplicationServerInfo computeBestServerWhenNotConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
      Map<Integer, BigDecimal> loadDistances)
  private static void computeBestServerWhenNotConnected(RSEvaluations evals,
      Map<Integer, BigDecimal> loadDistances, int localServerId)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    /*
     * Find the server with the current highest distance to its load goal and
     * choose it. Make an exception if every server is correctly balanced,
@@ -1815,14 +2033,15 @@
      // Choose server with the highest weight
      bestRsId = highestWeightRsId;
    }
    return bestServers.get(bestRsId);
    evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
        bestRsId));
  }
  private static ReplicationServerInfo computeBestServerWhenConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
  private static void computeBestServerWhenConnected(RSEvaluations evals,
      Map<Integer, BigDecimal> loadDistances, int localServerId,
      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    float currentLoadDistance =
      loadDistances.get(currentRsServerId).floatValue();
@@ -1928,45 +2147,67 @@
          {
            // Avoid the yoyo effect, and keep the local DS connected to its
            // current RS
            return bestServers.get(currentRsServerId);
            evals.setBestRS(currentRsServerId,
                NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
            return;
          }
        }
        // Prepare a sorted list (from lowest to highest) or DS server ids
        // connected to the current RS
        ReplicationServerInfo currentRsInfo =
          bestServers.get(currentRsServerId);
        List<Integer> serversConnectedToCurrentRS =
            new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
        Collections.sort(serversConnectedToCurrentRS);
        // Go through the list of DSs to disconnect and see if the local
        // server is part of them.
        int index = 0;
        while (overloadingDSsNumber > 0)
            bestServers.get(currentRsServerId);
        if (isServerOverloadingRS(localServerId, currentRsInfo,
            overloadingDSsNumber))
        {
          int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
          if (serverIdToDisconnect == localServerId)
          {
            // The local server is part of the DSs to disconnect
            return null;
          }
          overloadingDSsNumber--;
          index++;
          // The local server is part of the DSs to disconnect
          evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
              localServerId, currentRsServerId));
        }
        // The local server is not part of the servers to disconnect from the
        // current RS.
        else
        {
          // The local server is not part of the servers to disconnect from the
          // current RS.
          evals.setBestRS(currentRsServerId,
              NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
                  currentRsServerId));
        }
      } else {
        // The average distance of the other RSs does not show a lack of DSs:
        // no need to disconnect any DS from the current RS.
        evals.setBestRS(currentRsServerId,
            NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
                currentRsServerId));
      }
    } else {
      // The RS load goal is reached or there are not enough DSs connected to
      // it to reach it: do not disconnect from this RS and return rsInfo for
      // this RS
      evals.setBestRS(currentRsServerId,
          NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
              currentRsServerId));
    }
    return bestServers.get(currentRsServerId);
  }
  /**
   * Returns whether the local DS is overloading the RS.
   * <p>
   * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
   * DSs connected to this RS is ordered by serverId to use a consistent
   * ordering across all nodes in the topology. The serverIds which index in the
   * List are lower than "overloadingDSsNumber" will be evicted first.
   * <p>
   * This ordering is unfair since nodes with the lower serverIds will be
   * evicted more often than nodes with higher serverIds. However, it is a
   * consistent and reliable ordering applicable anywhere in the topology.
   */
  private static boolean isServerOverloadingRS(int localServerId,
      ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
  {
    List<Integer> serversConnectedToCurrentRS =
        new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
    Collections.sort(serversConnectedToCurrentRS);
    final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
    return idx != -1 && idx < overloadingDSsNumber;
  }
  /**
@@ -2069,8 +2310,8 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(this + " end restart : connected=" + connected
        + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
      debugInfo("end restart : connected=" + connected + " with RS("
          + getRsServerId() + ") genId=" + this.generationID);
    }
  }
@@ -2129,8 +2370,8 @@
        if (debugEnabled())
        {
          TRACER.debugInfo("ReplicationBroker.publish() Publishing a "
            + "message is not possible due to existing connection error.");
          debugInfo("publish(): Publishing a message is not possible due to"
              + " existing connection error.");
        }
        return false;
@@ -2232,8 +2473,8 @@
            // ignore
            if (debugEnabled())
            {
              TRACER.debugInfo("ReplicationBroker.publish() "
                + "Interrupted exception raised : " + e.getLocalizedMessage());
              debugInfo("publish(): Interrupted exception raised : "
                  + e.getLocalizedMessage());
            }
          }
        }
@@ -2242,8 +2483,8 @@
        // just loop.
        if (debugEnabled())
        {
          TRACER.debugInfo("ReplicationBroker.publish() "
            + "Interrupted exception raised." + e.getLocalizedMessage());
          debugInfo("publish(): Interrupted exception raised."
              + e.getLocalizedMessage());
        }
      }
    }
@@ -2393,10 +2634,10 @@
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              final ReplicationServerInfo bestServerInfo =
              final RSEvaluations evals =
                  computeBestReplicationServer(false, previousRsServerID, state,
                      replicationServerInfos, serverId, getGroupId(),
                      generationID);
                  replicationServerInfos, serverId, getGroupId(), generationID);
              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
              if (previousRsServerID != -1
                  && (bestServerInfo == null
                      || bestServerInfo.getServerId() != previousRsServerID))
@@ -2413,14 +2654,19 @@
                }
                else
                {
                  // TODO JNR log why an RS was evicted as best server
                  final int bestRsServerId = bestServerInfo.getServerId();
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      localSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
                      bestRsServerId,
                      baseDN.toNormalizedString(),
                      evals.getEvaluation(previousRsServerID).toString(),
                      evals.getEvaluation(bestRsServerId).toString());
                }
                logError(message);
                if (debugEnabled())
                  debugInfo("best replication servers evaluation results: "
                      + evals);
                reStart(true);
              }
@@ -2472,14 +2718,12 @@
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication Server to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
   * currently connected and
   * Gets the States of all the Replicas currently in the Topology. When this
   * method is called, a Monitoring message will be sent to the Replication
   * Server to which this domain is currently connected so that it computes a
   * table containing information about all Directory Servers in the topology.
   * This Computation involves communications will all the servers currently
   * connected and
   *
   * @return The States of all Replicas in the topology (except us)
   */
@@ -2539,9 +2783,8 @@
  public void stop()
  {
    if (debugEnabled())
      TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
          + " and will close the connection to replication server "
          + rsServerId + " for domain " + getBaseDN());
      debugInfo("is stopping and will close the connection to"
          + " replication server " + rsServerId);
    synchronized (startStopLock)
    {
@@ -2786,7 +3029,7 @@
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
      debugInfo("receive TopologyMsg=" + topoMsg);
    // Store new DS list
    dsList = topoMsg.getDsList();
@@ -2876,8 +3119,7 @@
    else
    {
      if (debugEnabled())
        TRACER.debugInfo(this
          + " is not configured to send CSN heartbeat interval");
        debugInfo("is not configured to send CSN heartbeat interval");
    }
  }
@@ -3003,4 +3245,10 @@
    }
    return sb.toString();
  }
  private void debugInfo(String message)
  {
    TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
        + " and serverId=" + getServerId() + " " + message);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -25,15 +25,16 @@
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
package org.opends.server.replication.service;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.MapEntry;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageDescriptor;
import org.opends.messages.Severity;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
@@ -42,12 +43,15 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker.RSEvaluations;
import org.opends.server.replication.service.ReplicationBroker.ReplicationServerInfo;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.util.Collections.*;
import static org.assertj.core.data.MapEntry.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.service.ReplicationBroker.*;
@@ -106,11 +110,12 @@
    Map<Integer, ReplicationServerInfo> rsInfos =
        newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
  }
  private Map<Integer, ReplicationServerInfo> newRSInfos(
@@ -156,11 +161,12 @@
    Map<Integer, ReplicationServerInfo> rsInfos =
        newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
  }
  /**
@@ -187,17 +193,16 @@
    Map<Integer, ReplicationServerInfo> rsInfos =
        newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
  }
  /**
   * Test with one replication server, up to date.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test1ServerUp() throws Exception
@@ -220,17 +225,16 @@
    Map<Integer, ReplicationServerInfo> rsInfos =
        newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
  }
  /**
   * Test with 2 replication servers, up to date.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test2ServersUp() throws Exception
@@ -260,17 +264,18 @@
    Map<Integer, ReplicationServerInfo> rsInfos = newRSInfos(
        newRSInfo(11, LOOSER1, aState1, 0, 1),
        newRSInfo(12, WINNER, aState2, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(),
        entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
        entry(12, NOTE_BEST_RS));
  }
  /**
   * Test with 2 replication servers, up to date, but 2 different group ids.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void testDiffGroup2ServersUp() throws Exception
@@ -302,17 +307,65 @@
    Map<Integer, ReplicationServerInfo> rsInfos = newRSInfos(
        newRSInfo(11, WINNER, aState1, 0, 1),
        newRSInfo(12, LOOSER1, aState2, 0, 2));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(),
        entry(11, NOTE_BEST_RS),
        entry(12, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS));
  }
  private void containsOnly(final Map<Integer, Message> evaluations,
      MapEntry... entries)
  {
    final List<MapEntry> notFound = new ArrayList<MapEntry>(Arrays.asList(entries));
    for (Iterator<MapEntry> iter = notFound.iterator(); iter.hasNext();)
    {
      final MapEntry entry = iter.next();
      final Message reason = evaluations.get(entry.key);
      if (reason != null && reason.getDescriptor().equals(entry.value))
      {
        iter.remove();
      }
    }
    if (!notFound.isEmpty())
    {
      final StringBuilder sb = new StringBuilder("expecting:\n");
      sb.append("  <").append(getDescription(evaluations)).append(">\n");
      sb.append("   to contain:\n");
      sb.append("  <").append(getDescription(Arrays.asList(entries))).append(">\n");
      sb.append("   but could not find:\n");
      sb.append("  <").append(getDescription(notFound)).append(">");
      throw new AssertionError(sb.toString());
    }
    Assertions.assertThat(evaluations).hasSize(entries.length);
  }
  private Map<Integer, String> getDescription(Map<Integer, Message> evaluations)
  {
    final Map<Integer, String> result = new LinkedHashMap<Integer, String>();
    for (Entry<Integer, Message> entry : evaluations.entrySet())
    {
      result.put(entry.getKey(), entry.getValue().getDescriptor().getKey());
    }
    return result;
  }
  private List<MapEntry> getDescription(List<MapEntry> entries)
  {
    final List<MapEntry> result = new ArrayList<MapEntry>();
    for (MapEntry entry : entries)
    {
      result.add(entry(entry.key, ((MessageDescriptor) entry.value).getKey()));
    }
    return result;
  }
  /**
   * Test with 2 replication servers, none of them from our group id.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void testNotOurGroup() throws Exception
@@ -342,17 +395,18 @@
    Map<Integer, ReplicationServerInfo> rsInfos = newRSInfos(
        newRSInfo(11, LOOSER1, aState1, 0, 2),
        newRSInfo(12, WINNER, aState2, 0, 2));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(),
        entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
        entry(12, NOTE_BEST_RS));
  }
  /**
   * Test with 3 replication servers, up to date.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test3ServersUp() throws Exception
@@ -389,17 +443,19 @@
        newRSInfo(11, LOOSER1, aState1, 0, 1),
        newRSInfo(12, LOOSER2, aState2, 0, 1),
        newRSInfo(13, WINNER, aState3, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(),
        entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
        entry(12, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
        entry(13, NOTE_BEST_RS));
  }
  /**
   * Test with 3 replication servers, up to date, but 2 different group ids.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void testDiffGroup3ServersUp() throws Exception
@@ -438,17 +494,19 @@
        // This server has less changes than looser2 but it has the same
        // group id as us so he should be the winner
        newRSInfo(13, WINNER, aState3, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(),
        entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
        entry(12, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS),
        entry(13, NOTE_BEST_RS));
  }
  /**
   * Test with one replication server, late.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test1ServerLate() throws Exception
@@ -471,43 +529,60 @@
    Map<Integer, ReplicationServerInfo> rsInfos =
        newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte) 1, 0);
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
  }
  @DataProvider(name = "create3ServersData")
  public Object[][] create3ServersData() {
    return new Object[][] {
        // first RS is up to date, the others are late none is local
        { 4, 2, 3, false, 1, 2, 3, false, 2, 3, 4, false},
        { 4, 2, 3, false,
          1, 2, 3, false,
          2, 3, 4, false},
        // test that the local RS  is chosen first when all up to date
        { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
        { 4, 2, 3, true,
          4, 2, 3, false,
          4, 2, 3, false},
        // test that the local ServerID is more important than the others
        { 4, 0, 0, false, 2, 100, 100, false, 1, 100, 100, false},
        { 4, 0, 0, false,
          2, 100, 100, false,
          1, 100, 100, false},
        // test that a remote RS is chosen first when up to date when the local
        // one is late
        { 4, 1, 1, false, 3, 1, 1, true, 3, 1, 1, false},
        { 4, 1, 1, false,
          3, 1, 1, true,
          3, 1, 1, false},
        // test that the local RS is not chosen first when it is missing
        // local changes
        { 4, 1, 1, false, 3, 2, 3, false, 1, 1, 1, true},
        { 4, 1, 1, false,
          3, 2, 3, false,
          1, 1, 1, true},
        // test that a RS which is more up to date than the DS is chosen
        { 5, 1, 1, false, 2, 0, 0, false, 1, 1, 1, false},
        { 5, 1, 1, false,
          2, 0, 0, false,
          1, 1, 1, false},
        // test that a RS which is more up to date than the DS is chosen even
        // is some RS with the same last change from the DS
        { 5, 1, 1, false, 4, 0, 0, false, 4, 1, 1, false},
        { 5, 1, 1, false,
          4, 0, 0, false,
          4, 1, 1, false},
        // test that the local RS is chosen first when it is missing
        // the same local changes as the other RSs
        { 3, 1, 1, true, 2, 1, 1, false, 3, 1, 1, false},
        { 3, 1, 1, true,
          2, 1, 1, false,
          3, 1, 1, false},
        };
  }
@@ -564,13 +639,31 @@
        newRSInfo(11, LOOSER1, aState1, 0, 1),
        newRSInfo(12, WINNER, aState2, 0, 1),
        newRSInfo(13, LOOSER2, aState3, 0, 1));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte) 1, 0);
    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    final boolean winnerIsLatestRS = winnerT1 > 4 && looser1T1 == 4 && looser2T1 == 4;
    containsOnly(evals.getEvaluations(),
        entry(11, getEval1(winnerIsLocal, looser1IsLocal, winnerIsLatestRS)),
        entry(12, NOTE_BEST_RS),
        entry(13, getEval1(winnerIsLocal, looser2IsLocal, winnerIsLatestRS)));
  }
  private MessageDescriptor getEval1(boolean winnerIsLocal, boolean looserIsLocal, boolean winnerIsLatestRS)
  {
    if (winnerIsLocal && !looserIsLocal)
    {
      return NOTE_RS_ON_DIFFERENT_VM_THAN_DS;
    }
    else if (winnerIsLatestRS)
    {
      return NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS;
    }
    return NOTE_RS_LATER_THAN_LOCAL_DS;
  }
  @DataProvider(name = "test3ServersMoreCriteria")
@@ -578,15 +671,21 @@
    return new Object[][] {
        // Test that a RS is chosen if its group is ok whereas the other parameters
        // are not ok
        { 1L, 1L, (byte)1, false, 4L, 0L, (byte)2, false, 4L, 0L, (byte)3, false},
        { 1L, 1L, 1, false,
          4L, 0L, 2, false,
          4L, 0L, 3, false},
        // Test that a RS is chosen if its genid is ok (all RS with same group)
        // and state is not ok
        { 1L, 0L, (byte)1, false, 4L, 1L, (byte)1, false, 4L, 2L, (byte)1, false},
        { 1L, 0L, 1, false,
          4L, 1L, 1, false,
          4L, 2L, 1, false},
        // Test that a RS is chosen if all servers have wrong genid and group id
        // but it is local
        { 1L, 1L, (byte)2, true, 4L, 2L, (byte)3, false, 5L, 3L, (byte)4, false}
        { 1L, 1L, 2, true,
          4L, 2L, 3, false,
          5L, 3L, 4, false}
        };
  }
@@ -595,9 +694,9 @@
   */
  @Test(dataProvider =  "test3ServersMoreCriteria")
  public void test3ServersMoreCriteria(
      long winnerT1, long winnerGenId, byte winnerGroupId, boolean winnerIsLocal,
      long looser1T1, long looser1GenId, byte looser1GroupId, boolean looser1IsLocal,
      long looser2T1, long looser2GenId, byte looser2GroupId, boolean looser2IsLocal)
      long winnerT1, long winnerGenId, int winnerGroupId, boolean winnerIsLocal,
      long looser1T1, long looser1GenId, int looser1GroupId, boolean looser1IsLocal,
      long looser2T1, long looser2GenId, int looser2GroupId, boolean looser2IsLocal)
      throws Exception
  {
    String testCase = "test3ServersMoreCriteria";
@@ -635,13 +734,30 @@
        newRSInfo(11, LOOSER1, aState1, looser1GenId, looser1GroupId),
        newRSInfo(12, WINNER, aState2, winnerGenId, winnerGroupId),
        newRSInfo(13, LOOSER2, aState3, looser2GenId, looser2GroupId));
    ReplicationServerInfo bestServer =
    RSEvaluations evals =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte) 1, 0);
    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
    assertEquals(evals.getBestRS().getServerURL(), WINNER,
        "Wrong best replication server.");
    containsOnly(evals.getEvaluations(),
        entry(11, getEval2(winnerGroupId == looser1GroupId, winnerIsLocal, looser1IsLocal)),
        entry(12, NOTE_BEST_RS),
        entry(13, getEval2(winnerGroupId == looser2GroupId, winnerIsLocal, looser2IsLocal)));
  }
  private MessageDescriptor getEval2(boolean sameGroupId, boolean winnerIsLocal, boolean looserIsLocal)
  {
    if (winnerIsLocal && !looserIsLocal)
    {
      return NOTE_RS_ON_DIFFERENT_VM_THAN_DS;
    }
    else if (!sameGroupId)
    {
      return NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS;
    }
    return NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS;
  }
  @SuppressWarnings("unchecked")
@@ -691,7 +807,7 @@
      rsInfos,
      -1, // current RS id
      -1, // local DS id
      rsInfos.values().iterator().next().getServerURL(), // winner url
      "BwinnerHost:123", // winner url
    };
    /**
@@ -738,7 +854,7 @@
      rsInfos,
      -1, // current RS id
      -1, // local DS id
      rsInfos.values().iterator().next().getServerURL(), // winner url
      "DwinnerHost:123", // winner url
    };
    /**
@@ -1311,8 +1427,9 @@
    debugInfo("Starting " + testCase);
    ReplicationServerInfo bestServer =
      computeBestServerForWeight(servers, currentRsServerId, localServerId);
    final RSEvaluations evals = new RSEvaluations(localServerId, servers);
    computeBestServerForWeight(evals, currentRsServerId, localServerId);
    final ReplicationServerInfo bestServer = evals.getBestRS();
    if (winnerUrl == null)
    {