From 5c3362e4dd1ee79c10160cfd128b70cdb9a2dee1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 28 Oct 2013 09:06:07 +0000
Subject: [PATCH] OPENDJ-1053 (CR-2520) Improve logging of replication load balancing and fail-over in order to diagnose the causes of these events

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  722 +++++++++++++++++++++++++++++++++++++------------------
 1 files changed, 485 insertions(+), 237 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1072f55..e755741 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -806,9 +806,8 @@
        * out which one is the best to connect to.
        */
       if (debugEnabled())
-        TRACER.debugInfo("serverId: " + serverId
-          + " phase 1 : will perform PhaseOneH with each RS in "
-          + " order to elect the preferred one");
+        debugInfo("phase 1 : will perform PhaseOneH with each RS in order to "
+            + "elect the preferred one");
 
       // Get info from every available replication servers
       replicationServerInfos = collectReplicationServersInfo();
@@ -818,14 +817,14 @@
       if (replicationServerInfos.size() > 0)
       {
         // At least one server answered, find the best one.
-        electedRsInfo = computeBestReplicationServer(true, -1, state,
-          replicationServerInfos, serverId, getGroupId(), getGenerationID());
+        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
+            replicationServerInfos, serverId, getGroupId(), getGenerationID());
+        electedRsInfo = evals.getBestRS();
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
-          TRACER.debugInfo("serverId: " + serverId
-            + " phase 2 : will perform PhaseOneH with the preferred RS="
-            + electedRsInfo);
+          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
+              + electedRsInfo);
         electedRsInfo = performPhaseOneHandshake(
           electedRsInfo.getServerURL(), true, false);
 
@@ -1056,8 +1055,7 @@
       int nChanges = ServerState.diffChanges(rsState, state);
       if (debugEnabled())
       {
-        TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
-            + getServerId() + " computed " + nChanges + " changes late.");
+        debugInfo("computed " + nChanges + " changes late.");
       }
 
       /*
@@ -1136,8 +1134,8 @@
       ReplicationMsg msg = localSession.receive();
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
-            + serverStartMsg + "\nAND RECEIVED:\n" + msg);
+        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
+            + msg);
       }
 
       // Wrap received message in a server info object
@@ -1253,8 +1251,7 @@
       // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
-            + startECLSessionMsg);
+        TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
       }
 
       // Alright set the timeout to the desired value
@@ -1320,8 +1317,8 @@
 
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
-            + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
+        TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
+            + "\nAND RECEIVED:\n" + topologyMsg);
       }
 
       // Alright set the timeout to the desired value
@@ -1342,13 +1339,203 @@
   }
 
   /**
+   * Class holding evaluation results for electing the best replication server
+   * for the local directory server.
+   */
+  static class RSEvaluations
+  {
+
+    private final int localServerId;
+    private Map<Integer, ReplicationServerInfo> bestRSs;
+    private final Map<Integer, Message> rsEvals =
+        new HashMap<Integer, Message>();
+
+    /**
+     * Ctor.
+     *
+     * @param localServerId
+     *          the serverId for the local directory server
+     * @param rsInfos
+     *          a Map of serverId => {@link ReplicationServerInfo} with all the
+     *          candidate replication servers
+     */
+    RSEvaluations(int localServerId,
+        Map<Integer, ReplicationServerInfo> rsInfos)
+    {
+      this.localServerId = localServerId;
+      this.bestRSs = rsInfos;
+    }
+
+    private boolean keepBest(LocalEvaluation eval)
+    {
+      if (eval.hasAcceptedAny())
+      {
+        bestRSs = eval.getAccepted();
+        rsEvals.putAll(eval.getRejected());
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Sets the elected best replication server, rejecting all the other
+     * replication servers with the supplied evaluation.
+     *
+     * @param bestRsId
+     *          the serverId of the elected replication server
+     * @param rejectedRSsEval
+     *          the evaluation for all the rejected replication servers
+     */
+    private void setBestRS(int bestRsId, Message rejectedRSsEval)
+    {
+      for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
+          this.bestRSs.entrySet().iterator(); it.hasNext();)
+      {
+        final Entry<Integer, ReplicationServerInfo> entry = it.next();
+        final Integer rsId = entry.getKey();
+        final ReplicationServerInfo rsInfo = entry.getValue();
+        if (rsInfo.getServerId() != bestRsId)
+        {
+          it.remove();
+        }
+        rsEvals.put(rsId, rejectedRSsEval);
+      }
+    }
+
+    private void discardAll(Message eval)
+    {
+      for (Integer rsId : bestRSs.keySet())
+      {
+        rsEvals.put(rsId, eval);
+      }
+    }
+
+    private boolean foundBestRS()
+    {
+      return bestRSs.size() == 1;
+    }
+
+    /**
+     * Returns the {@link ReplicationServerInfo} for the best replication
+     * server.
+     *
+     * @return the {@link ReplicationServerInfo} for the best replication server
+     */
+    ReplicationServerInfo getBestRS()
+    {
+      if (foundBestRS())
+      {
+        return bestRSs.values().iterator().next();
+      }
+      return null;
+    }
+
+    /**
+     * Returns the evaluations for all the candidate replication servers.
+     *
+     * @return a Map of serverId => Message containing the evaluation for each
+     *         candidate replication servers.
+     */
+    Map<Integer, Message> getEvaluations()
+    {
+      if (foundBestRS())
+      {
+        final Integer bestRSServerId = getBestRS().getServerId();
+        if (rsEvals.get(bestRSServerId) == null)
+        {
+          final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
+          rsEvals.put(bestRSServerId, eval);
+        }
+      }
+      return Collections.unmodifiableMap(rsEvals);
+    }
+
+    /**
+     * Returns the evaluation for the supplied replication server Id.
+     * <p>
+     * Note: "unknown RS" message is returned if the supplied replication server
+     * was not part of the candidate replication servers.
+     *
+     * @param rsServerId
+     *          the supplied replication server Id
+     * @return the evaluation {@link Message} for the supplied replication
+     *         server Id
+     */
+    private Message getEvaluation(int rsServerId)
+    {
+      final Message evaluation = getEvaluations().get(rsServerId);
+      if (evaluation != null)
+      {
+        return evaluation;
+      }
+      return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return new StringBuilder()
+          .append("Current best replication server Ids: ").append(
+              this.bestRSs.keySet()).append(
+              ", Evaluation of connected replication servers").append(
+              " (ServerId => Evaluation): ").append(this.rsEvals.keySet())
+          .append(", Any replication server not appearing here").append(
+              " could not be contacted.").toString();
+    }
+  }
+
+  /**
+   * Evaluation local to one filter.
+   */
+  private static class LocalEvaluation
+  {
+    private final Map<Integer, ReplicationServerInfo> filteredRSs =
+        new HashMap<Integer, ReplicationServerInfo>();
+    private final Map<ReplicationServerInfo, Message> rsEvals =
+        new HashMap<ReplicationServerInfo, Message>();
+
+    private void accept(Integer rsId, ReplicationServerInfo rsInfo)
+    {
+      this.rsEvals.remove(rsInfo); // undo reject
+      this.filteredRSs.put(rsId, rsInfo);
+    }
+
+    private void reject(ReplicationServerInfo rsInfo, Message reason)
+    {
+      this.filteredRSs.remove(rsInfo.getServerId()); // undo accept
+      this.rsEvals.put(rsInfo, reason);
+    }
+
+    private Map<Integer, ReplicationServerInfo> getAccepted()
+    {
+      return filteredRSs;
+    }
+
+    public Map<Integer, Message> getRejected()
+    {
+      final Map<Integer, Message> result = new HashMap<Integer, Message>();
+      for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet())
+      {
+        result.put(entry.getKey().getServerId(), entry.getValue());
+      }
+      return result;
+    }
+
+    private boolean hasAcceptedAny()
+    {
+      return !filteredRSs.isEmpty();
+    }
+
+  }
+
+  /**
    * Returns the replication server that best fits our need so that we can
    * connect to it or determine if we must disconnect from current one to
    * re-connect to best server.
-   *
+   * <p>
    * Note: this method is static for test purpose (access from unit tests)
    *
-   *
    * @param firstConnection True if we run this method for the very first
    * connection of the broker. False if we run this method to determine if the
    * replication server we are currently connected to is still the best or not.
@@ -1365,15 +1552,16 @@
    * disconnect (so the best replication server is another one than the current
    * one). Null can only be returned when firstConnection is false.
    */
-  public static ReplicationServerInfo computeBestReplicationServer(
+  public static RSEvaluations computeBestReplicationServer(
       boolean firstConnection, int rsServerId, ServerState myState,
       Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
       byte groupId, long generationId)
   {
+    final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
     // Shortcut, if only one server, this is the best
-    if (rsInfos.size() == 1)
+    if (evals.foundBestRS())
     {
-      return rsInfos.values().iterator().next();
+      return evals;
     }
 
     /**
@@ -1386,8 +1574,6 @@
      *   local DS
      * - replication server in the same VM as local DS one
      */
-    // TODO JNR log why an RS was evicted as best server
-    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
     /*
     The list of best replication servers is filtered with each criteria. At
     each criteria, the list is replaced with the filtered one if there
@@ -1398,109 +1584,96 @@
     the local configuration. When the current method is called, for
     sure, at least one server from the list is locally configured
     */
-    bestServers =
-        keepBest(filterServersLocallyConfigured(bestServers), bestServers);
+    filterServersLocallyConfigured(evals, localServerId);
     // Some servers with same group id ?
-    bestServers =
-        keepBest(filterServersWithSameGroupId(bestServers, groupId),
-            bestServers);
+    filterServersWithSameGroupId(evals, localServerId, groupId);
     // Some servers with same generation id ?
-    Map<Integer, ReplicationServerInfo> sameGenerationId =
-        filterServersWithSameGenerationId(bestServers, generationId);
-    if (sameGenerationId.size() > 0)
+    final boolean rssWithSameGenerationIdExist =
+        filterServersWithSameGenerationId(evals, localServerId, generationId);
+    if (rssWithSameGenerationIdExist)
     {
       // If some servers with the right generation id this is useful to
       // run the local DS change criteria
-      bestServers =
-          keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
-              myState, localServerId), sameGenerationId);
+      filterServersWithAllLocalDSChanges(evals, myState, localServerId);
     }
     // Some servers in the local VM or local host?
-    bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
+    filterServersOnSameHost(evals, localServerId);
 
-    /**
-     * Now apply the choice base on the weight to the best servers list
-     */
-    if (bestServers.size() == 1)
+    if (evals.foundBestRS())
     {
-      return bestServers.values().iterator().next();
+      return evals;
     }
 
+    /**
+     * Now apply the choice based on the weight to the best servers list
+     */
     if (firstConnection)
     {
       // We are not connected to a server yet
-      return computeBestServerForWeight(bestServers, -1, -1);
+      computeBestServerForWeight(evals, -1, -1);
     }
-    /*
-     * We are already connected to a RS: compute the best RS as far as the
-     * weights is concerned. If this is another one, some DS must disconnect.
-     */
-    return computeBestServerForWeight(bestServers, rsServerId, localServerId);
-  }
-
-  /**
-   * If the filtered Map is not empty then it is returned, else return the
-   * original unfiltered Map.
-   *
-   * @return the best fit Map between the filtered Map and the original
-   * unfiltered Map.
-   */
-  private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
-      Map<K, V> unfilteredMap)
-  {
-    if (!filteredMap.isEmpty())
+    else
     {
-      return filteredMap;
+      /*
+       * We are already connected to a RS: compute the best RS as far as the
+       * weights is concerned. If this is another one, some DS must disconnect.
+       */
+      computeBestServerForWeight(evals, rsServerId, localServerId);
     }
-    return unfilteredMap;
+    return evals;
   }
 
   /**
    * Creates a new list that contains only replication servers that are locally
    * configured.
-   * @param bestServers The list of replication servers to filter
-   * @return The sub list of replication servers locally configured
+   * @param evals The evaluation object
    */
-  private static Map<Integer, ReplicationServerInfo>
-    filterServersLocallyConfigured(Map<Integer,
-    ReplicationServerInfo> bestServers)
+  private static void filterServersLocallyConfigured(RSEvaluations evals,
+      int localServerId)
   {
-    Map<Integer, ReplicationServerInfo> result =
-      new HashMap<Integer, ReplicationServerInfo>();
-    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+    final LocalEvaluation eval = new LocalEvaluation();
+    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
     {
-      ReplicationServerInfo rsInfo = entry.getValue();
+      final Integer rsId = entry.getKey();
+      final ReplicationServerInfo rsInfo = entry.getValue();
       if (rsInfo.isLocallyConfigured())
       {
-        result.put(entry.getKey(), rsInfo);
+        eval.accept(rsId, rsInfo);
+      }
+      else
+      {
+        eval.reject(rsInfo,
+            NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
       }
     }
-    return result;
+    evals.keepBest(eval);
   }
 
   /**
    * Creates a new list that contains only replication servers that have the
    * passed group id, from a passed replication server list.
-   * @param bestServers The list of replication servers to filter
+   * @param evals The evaluation object
    * @param groupId The group id that must match
-   * @return The sub list of replication servers matching the requested group id
-   * (which may be empty)
    */
-  private static Map<Integer, ReplicationServerInfo>
-    filterServersWithSameGroupId(Map<Integer,
-    ReplicationServerInfo> bestServers, byte groupId)
+  private static void filterServersWithSameGroupId(RSEvaluations evals,
+      int localServerId, byte groupId)
   {
-    Map<Integer, ReplicationServerInfo> result =
-      new HashMap<Integer, ReplicationServerInfo>();
-    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+    final LocalEvaluation eval = new LocalEvaluation();
+    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
     {
-      ReplicationServerInfo rsInfo = entry.getValue();
+      final Integer rsId = entry.getKey();
+      final ReplicationServerInfo rsInfo = entry.getValue();
       if (rsInfo.getGroupId() == groupId)
       {
-        result.put(entry.getKey(), rsInfo);
+        eval.accept(rsId, rsInfo);
+      }
+      else
+      {
+        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
+            rsId, rsInfo.getGroupId(), localServerId, groupId));
       }
     }
-    return result;
+    evals.keepBest(eval);
   }
 
   /**
@@ -1510,28 +1683,37 @@
    * then the 'empty'(generationId==-1) replication servers are also included
    * in the result list.
    *
-   * @param bestServers  The list of replication servers to filter
+   * @param evals The evaluation object
    * @param generationId The generation id that must match
-   * @return The sub list of replication servers matching the requested
-   * generation id (which may be empty)
+   * @return whether some replication server passed the filter
    */
-  private static Map<Integer, ReplicationServerInfo>
-    filterServersWithSameGenerationId(Map<Integer,
-    ReplicationServerInfo> bestServers, long generationId)
+  private static boolean filterServersWithSameGenerationId(
+      RSEvaluations evals, long localServerId, long generationId)
   {
-    Map<Integer, ReplicationServerInfo> result =
-      new HashMap<Integer, ReplicationServerInfo>();
+    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
+    final LocalEvaluation eval = new LocalEvaluation();
     boolean emptyState = true;
 
     for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo rsInfo = entry.getValue();
+      final Integer rsId = entry.getKey();
+      final ReplicationServerInfo rsInfo = entry.getValue();
       if (rsInfo.getGenerationId() == generationId)
       {
-        result.put(entry.getKey(), rsInfo);
+        eval.accept(rsId, rsInfo);
         if (!rsInfo.serverState.isEmpty())
           emptyState = false;
       }
+      else if (rsInfo.getGenerationId() == -1)
+      {
+        eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
+            generationId, localServerId));
+      }
+      else
+      {
+        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
+            rsId, rsInfo.getGenerationId(), localServerId, generationId));
+      }
     }
 
     if (emptyState)
@@ -1543,38 +1725,27 @@
         ReplicationServerInfo rsInfo = entry.getValue();
         if (rsInfo.getGenerationId() == -1)
         {
-          result.put(entry.getKey(), rsInfo);
+          // will undo the reject of previously rejected RSs
+          eval.accept(entry.getKey(), rsInfo);
         }
       }
     }
-    return result;
+
+    return evals.keepBest(eval);
   }
 
   /**
    * Creates a new list that contains only replication servers that have the
    * latest changes from the passed DS, from a passed replication server list.
-   * @param bestServers The list of replication servers to filter
+   * @param evals The evaluation object
    * @param localState The state of the local DS
    * @param localServerId The server id to consider for the changes
-   * @return The sub list of replication servers that have the latest changes
-   * from the passed DS (which may be empty)
    */
-  private static Map<Integer, ReplicationServerInfo>
-    filterServersWithAllLocalDSChanges(Map<Integer,
-    ReplicationServerInfo> bestServers, ServerState localState,
-    int localServerId)
+  private static void filterServersWithAllLocalDSChanges(
+      RSEvaluations evals, ServerState localState, int localServerId)
   {
-    Map<Integer, ReplicationServerInfo> upToDateServers =
-      new HashMap<Integer, ReplicationServerInfo>();
-    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
-      new HashMap<Integer, ReplicationServerInfo>();
-
     // Extract the CSN of the latest change generated by the local server
-    CSN myCSN = localState.getCSN(localServerId);
-    if (myCSN == null)
-    {
-      myCSN = new CSN(0, 0, localServerId);
-    }
+    final CSN localCSN = getCSN(localState, localServerId);
 
     /**
      * Find replication servers that are up to date (or more up to date than us,
@@ -1583,60 +1754,96 @@
      * server id. If some servers are more up to date, prefer this list but take
      * only the latest CSN.
      */
+    final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
+    boolean foundRSMoreUpToDateThanLocalDS = false;
     CSN latestRsCSN = null;
-    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
     {
       final Integer rsId = entry.getKey();
       final ReplicationServerInfo rsInfo = entry.getValue();
-      CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
-      if (rsCSN == null)
-      {
-        rsCSN = new CSN(0, 0, localServerId);
-      }
+      final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
 
       // Has this replication server the latest local change ?
-      if (myCSN.isOlderThanOrEqualTo(rsCSN))
+      if (rsCSN.isOlderThan(localCSN))
       {
-        if (myCSN.equals(rsCSN))
+        mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
+            rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
+      }
+      else if (rsCSN.equals(localCSN))
+      {
+        // This replication server has exactly the latest change from the
+        // local server
+        if (!foundRSMoreUpToDateThanLocalDS)
         {
-          // This replication server has exactly the latest change from the
-          // local server
-          upToDateServers.put(rsId, rsInfo);
-        } else
+          mostUpToDateEval.accept(rsId, rsInfo);
+        }
+        else
         {
-          // This replication server is even more up to date than the local
-          // server
-          if (latestRsCSN == null)
-          {
-            // Initialize the latest CSN
-            latestRsCSN = rsCSN;
-          }
-          if (rsCSN.isNewerThanOrEqualTo(latestRsCSN))
-          {
-            if (rsCSN.equals(latestRsCSN))
-            {
-              moreUpToDateServers.put(rsId, rsInfo);
-            } else
-            {
-              // This RS is even more up to date, clear the list and store this
-              // new RS
-              moreUpToDateServers.clear();
-              moreUpToDateServers.put(rsId, rsInfo);
-              latestRsCSN = rsCSN;
-            }
-          }
+          mostUpToDateEval.reject(rsInfo,
+            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
+        }
+      }
+      else if (rsCSN.isNewerThan(localCSN))
+      {
+        // This replication server is even more up to date than the local server
+        if (latestRsCSN == null)
+        {
+          foundRSMoreUpToDateThanLocalDS = true;
+          // all previous results are now outdated, reject them all
+          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
+              localCSN);
+          // Initialize the latest CSN
+          latestRsCSN = rsCSN;
+        }
+
+        if (rsCSN.equals(latestRsCSN))
+        {
+          mostUpToDateEval.accept(rsId, rsInfo);
+        }
+        else if (rsCSN.isNewerThan(latestRsCSN))
+        {
+          // This RS is even more up to date, reject all previously accepted RSs
+          // and store this new RS
+          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
+              localCSN);
+          mostUpToDateEval.accept(rsId, rsInfo);
+          latestRsCSN = rsCSN;
+        }
+        else
+        {
+          mostUpToDateEval.reject(rsInfo,
+            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
         }
       }
     }
-    if (moreUpToDateServers.size() > 0)
-    {
-      // Prefer servers more up to date than local server
-      return moreUpToDateServers;
-    }
-    return upToDateServers;
+    evals.keepBest(mostUpToDateEval);
   }
 
+  private static CSN getCSN(ServerState localState, int localServerId)
+  {
+    final CSN csn = localState.getCSN(localServerId);
+    if (csn != null)
+    {
+      return csn;
+    }
+    return new CSN(0, 0, localServerId);
+  }
 
+  private static void rejectAllWithRSIsLaterThanBestRS(
+      final LocalEvaluation eval, int localServerId, CSN localCSN)
+  {
+    for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
+    {
+      final String rsCSN =
+          getCSN(rsInfo.getServerState(), localServerId).toStringUI();
+      final Message reason =
+          NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+            rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
+      eval.reject(rsInfo, reason);
+    }
+  }
 
   /**
    * Creates a new list that contains only replication servers that are on the
@@ -1644,22 +1851,18 @@
    * method will gives priority to any replication server which is in the same
    * VM as this DS.
    *
-   * @param bestServers
-   *          The list of replication servers to filter
-   * @return The sub list of replication servers being on the same host as the
-   *         local DS (which may be empty)
+   * @param evals The evaluation object
    */
-  private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
-      Map<Integer, ReplicationServerInfo> bestServers)
+  private static void filterServersOnSameHost(RSEvaluations evals,
+      int localServerId)
   {
     /*
      * Initially look for all servers on the same host. If we find one in the
      * same VM, then narrow the search.
      */
     boolean foundRSInSameVM = false;
-    final Map<Integer, ReplicationServerInfo> result =
-        new HashMap<Integer, ReplicationServerInfo>();
-    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+    final LocalEvaluation eval = new LocalEvaluation();
+    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
     {
       final Integer rsId = entry.getKey();
       final ReplicationServerInfo rsInfo = entry.getValue();
@@ -1672,24 +1875,41 @@
           {
             // An RS in the same VM will always have priority.
             // Narrow the search to only include servers in this VM.
-            result.clear();
+            rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
             foundRSInSameVM = true;
           }
-          result.put(rsId, rsInfo);
+          eval.accept(rsId, rsInfo);
         }
         else if (!foundRSInSameVM)
         {
           // OK, accept RSs on the same machine because we have not found an RS
           // in the same VM yet
-          result.put(rsId, rsInfo);
+          eval.accept(rsId, rsInfo);
         }
         else
         {
           // Skip: we have found some RSs in the same VM, but this RS is not.
+          eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
+              localServerId));
         }
       }
+      else
+      {
+        eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
+            localServerId));
+      }
     }
-    return result;
+    evals.keepBest(eval);
+  }
+
+  private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
+      int localServerId)
+  {
+    for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
+    {
+      eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
+          rsInfo.getServerId(), localServerId));
+    }
   }
 
   /**
@@ -1699,23 +1919,18 @@
    * Warning: This method is expected to be called with at least 2 servers in
    * bestServers
    * Note: this method is static for test purpose (access from unit tests)
-   * @param bestServers The list of replication servers to consider
+   * @param evals The evaluation object
    * @param currentRsServerId The replication server the local server is
    *        currently connected to. -1 if the local server is not yet connected
    *        to any replication server.
    * @param localServerId The server id of the local server. This is not used
    *        when it is not connected to a replication server
    *        (currentRsServerId = -1)
-   * @return The replication server the local server should be connected to
-   * as far as the weight is concerned. This may be the currently used one if
-   * the weight is correctly spread. If the returned value is null, the best
-   * replication server is undetermined but the local server must disconnect
-   * (so the best replication server is another one than the current one).
    */
-  public static ReplicationServerInfo computeBestServerForWeight(
-    Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
-    int localServerId)
+  public static void computeBestServerForWeight(RSEvaluations evals,
+      int currentRsServerId, int localServerId)
   {
+    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
     /*
      * - Compute the load goal of each RS, deducing it from the weights affected
      * to them.
@@ -1765,18 +1980,21 @@
     {
       // The local server is not connected yet, find best server to connect to,
       // taking the weights into account.
-      return computeBestServerWhenNotConnected(bestServers, loadDistances);
+      computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
     }
-    // The local server is currently connected to a RS, let's see if it must
-    // disconnect or not, taking the weights into account.
-    return computeBestServerWhenConnected(bestServers, loadDistances,
-        localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+    else
+    {
+      // The local server is currently connected to a RS, let's see if it must
+      // disconnect or not, taking the weights into account.
+      computeBestServerWhenConnected(evals, loadDistances, localServerId,
+          currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+    }
   }
 
-  private static ReplicationServerInfo computeBestServerWhenNotConnected(
-      Map<Integer, ReplicationServerInfo> bestServers,
-      Map<Integer, BigDecimal> loadDistances)
+  private static void computeBestServerWhenNotConnected(RSEvaluations evals,
+      Map<Integer, BigDecimal> loadDistances, int localServerId)
   {
+    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
     /*
      * Find the server with the current highest distance to its load goal and
      * choose it. Make an exception if every server is correctly balanced,
@@ -1815,14 +2033,15 @@
       // Choose server with the highest weight
       bestRsId = highestWeightRsId;
     }
-    return bestServers.get(bestRsId);
+    evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
+        bestRsId));
   }
 
-  private static ReplicationServerInfo computeBestServerWhenConnected(
-      Map<Integer, ReplicationServerInfo> bestServers,
+  private static void computeBestServerWhenConnected(RSEvaluations evals,
       Map<Integer, BigDecimal> loadDistances, int localServerId,
       int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
   {
+    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
     final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
     float currentLoadDistance =
       loadDistances.get(currentRsServerId).floatValue();
@@ -1928,45 +2147,67 @@
           {
             // Avoid the yoyo effect, and keep the local DS connected to its
             // current RS
-            return bestServers.get(currentRsServerId);
+            evals.setBestRS(currentRsServerId,
+                NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
+            return;
           }
         }
 
-        // Prepare a sorted list (from lowest to highest) or DS server ids
-        // connected to the current RS
         ReplicationServerInfo currentRsInfo =
-          bestServers.get(currentRsServerId);
-        List<Integer> serversConnectedToCurrentRS =
-            new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
-        Collections.sort(serversConnectedToCurrentRS);
-
-        // Go through the list of DSs to disconnect and see if the local
-        // server is part of them.
-        int index = 0;
-        while (overloadingDSsNumber > 0)
+            bestServers.get(currentRsServerId);
+        if (isServerOverloadingRS(localServerId, currentRsInfo,
+            overloadingDSsNumber))
         {
-          int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
-          if (serverIdToDisconnect == localServerId)
-          {
-            // The local server is part of the DSs to disconnect
-            return null;
-          }
-          overloadingDSsNumber--;
-          index++;
+          // The local server is part of the DSs to disconnect
+          evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
+              localServerId, currentRsServerId));
         }
-
-        // The local server is not part of the servers to disconnect from the
-        // current RS.
+        else
+        {
+          // The local server is not part of the servers to disconnect from the
+          // current RS.
+          evals.setBestRS(currentRsServerId,
+              NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
+                  currentRsServerId));
+        }
       } else {
         // The average distance of the other RSs does not show a lack of DSs:
         // no need to disconnect any DS from the current RS.
+        evals.setBestRS(currentRsServerId,
+            NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
+                currentRsServerId));
       }
     } else {
       // The RS load goal is reached or there are not enough DSs connected to
       // it to reach it: do not disconnect from this RS and return rsInfo for
       // this RS
+      evals.setBestRS(currentRsServerId,
+          NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
+              currentRsServerId));
     }
-    return bestServers.get(currentRsServerId);
+  }
+
+  /**
+   * Returns whether the local DS is overloading the RS.
+   * <p>
+   * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
+   * DSs connected to this RS is ordered by serverId to use a consistent
+   * ordering across all nodes in the topology. The serverIds which index in the
+   * List are lower than "overloadingDSsNumber" will be evicted first.
+   * <p>
+   * This ordering is unfair since nodes with the lower serverIds will be
+   * evicted more often than nodes with higher serverIds. However, it is a
+   * consistent and reliable ordering applicable anywhere in the topology.
+   */
+  private static boolean isServerOverloadingRS(int localServerId,
+      ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
+  {
+    List<Integer> serversConnectedToCurrentRS =
+        new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
+    Collections.sort(serversConnectedToCurrentRS);
+
+    final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
+    return idx != -1 && idx < overloadingDSsNumber;
   }
 
   /**
@@ -2069,8 +2310,8 @@
 
     if (debugEnabled())
     {
-      TRACER.debugInfo(this + " end restart : connected=" + connected
-        + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
+      debugInfo("end restart : connected=" + connected + " with RS("
+          + getRsServerId() + ") genId=" + this.generationID);
     }
   }
 
@@ -2129,8 +2370,8 @@
 
         if (debugEnabled())
         {
-          TRACER.debugInfo("ReplicationBroker.publish() Publishing a "
-            + "message is not possible due to existing connection error.");
+          debugInfo("publish(): Publishing a message is not possible due to"
+              + " existing connection error.");
         }
 
         return false;
@@ -2232,8 +2473,8 @@
             // ignore
             if (debugEnabled())
             {
-              TRACER.debugInfo("ReplicationBroker.publish() "
-                + "Interrupted exception raised : " + e.getLocalizedMessage());
+              debugInfo("publish(): Interrupted exception raised : "
+                  + e.getLocalizedMessage());
             }
           }
         }
@@ -2242,8 +2483,8 @@
         // just loop.
         if (debugEnabled())
         {
-          TRACER.debugInfo("ReplicationBroker.publish() "
-            + "Interrupted exception raised." + e.getLocalizedMessage());
+          debugInfo("publish(): Interrupted exception raised."
+              + e.getLocalizedMessage());
         }
       }
     }
@@ -2393,10 +2634,10 @@
             {
               // Stable topology (no topo msg since few seconds): proceed with
               // best server checking.
-              final ReplicationServerInfo bestServerInfo =
+              final RSEvaluations evals =
                   computeBestReplicationServer(false, previousRsServerID, state,
-                      replicationServerInfos, serverId, getGroupId(),
-                      generationID);
+                  replicationServerInfos, serverId, getGroupId(), generationID);
+              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
               if (previousRsServerID != -1
                   && (bestServerInfo == null
                       || bestServerInfo.getServerId() != previousRsServerID))
@@ -2413,14 +2654,19 @@
                 }
                 else
                 {
-                  // TODO JNR log why an RS was evicted as best server
+                  final int bestRsServerId = bestServerInfo.getServerId();
                   message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                       serverId, previousRsServerID,
                       localSession.getReadableRemoteAddress(),
-                      bestServerInfo.getServerId(),
-                      baseDN.toNormalizedString());
+                      bestRsServerId,
+                      baseDN.toNormalizedString(),
+                      evals.getEvaluation(previousRsServerID).toString(),
+                      evals.getEvaluation(bestRsServerId).toString());
                 }
                 logError(message);
+                if (debugEnabled())
+                  debugInfo("best replication servers evaluation results: "
+                      + evals);
                 reStart(true);
               }
 
@@ -2472,14 +2718,12 @@
   }
 
   /**
-   * Gets the States of all the Replicas currently in the
-   * Topology.
-   * When this method is called, a Monitoring message will be sent
-   * to the Replication Server to which this domain is currently connected
-   * so that it computes a table containing information about
-   * all Directory Servers in the topology.
-   * This Computation involves communications will all the servers
-   * currently connected and
+   * Gets the States of all the Replicas currently in the Topology. When this
+   * method is called, a Monitoring message will be sent to the Replication
+   * Server to which this domain is currently connected so that it computes a
+   * table containing information about all Directory Servers in the topology.
+   * This Computation involves communications will all the servers currently
+   * connected and
    *
    * @return The States of all Replicas in the topology (except us)
    */
@@ -2539,9 +2783,8 @@
   public void stop()
   {
     if (debugEnabled())
-      TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
-          + " and will close the connection to replication server "
-          + rsServerId + " for domain " + getBaseDN());
+      debugInfo("is stopping and will close the connection to"
+          + " replication server " + rsServerId);
 
     synchronized (startStopLock)
     {
@@ -2786,7 +3029,7 @@
   public void receiveTopo(TopologyMsg topoMsg)
   {
     if (debugEnabled())
-      TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
+      debugInfo("receive TopologyMsg=" + topoMsg);
 
     // Store new DS list
     dsList = topoMsg.getDsList();
@@ -2876,8 +3119,7 @@
     else
     {
       if (debugEnabled())
-        TRACER.debugInfo(this
-          + " is not configured to send CSN heartbeat interval");
+        debugInfo("is not configured to send CSN heartbeat interval");
     }
   }
 
@@ -3003,4 +3245,10 @@
     }
     return sb.toString();
   }
+
+  private void debugInfo(String message)
+  {
+    TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
+        + " and serverId=" + getServerId() + " " + message);
+  }
 }

--
Gitblit v1.10.0