mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.50.2013 950cfbfa7d895b728f432500027df274d6b1d6c7
OPENDJ-66 (CR-1365) DS does not failover between replication servers in different groups when configured explicitly for one of the groups 

Code cleanup.

ReplicationBroker.java
In computeBestReplicationServer(), fixed the screwed up for (ctr) switch (ctr){}; pattern + added keepBest() method.
In computeBestServerForWeight(), used BigDecimal.valueOf() instead of ctor.
In changeConfig(), fixed complicated condition.
Added javadocs, fixed variable naming, used foreach.

StaticUtils.java:
Added toIterable().
2 files modified
373 ■■■■■ changed files
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 277 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/util/StaticUtils.java 96 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -23,17 +23,14 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
import java.net.UnknownHostException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.collectionToString;
import static org.opends.server.util.StaticUtils.isLocalAddress;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.math.BigDecimal;
@@ -45,6 +42,7 @@
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -70,9 +68,9 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
/**
 * The broker for Multi-master Replication.
@@ -86,7 +84,10 @@
  private static final DebugTracer TRACER = getTracer();
  private volatile boolean shutdown = false;
  private final Object startStopLock = new Object();
  private volatile Collection<String> servers;
  /**
   * Replication server URLs under this format: "<code>hostname:port</code>".
   */
  private volatile Collection<String> replicationServerUrls;
  private volatile boolean connected = false;
  private volatile String replicationServer = "Not connected";
  private volatile ProtocolSession session = null;
@@ -101,15 +102,15 @@
  private int timeout = 0;
  private short protocolVersion;
  private ReplSessionSecurity replSessionSecurity;
  // My group id
  private byte groupId = (byte) -1;
  // The group id of the RS we are connected to
  private byte rsGroupId = (byte) -1;
  // The server id of the RS we are connected to
  /** My group id. */
  private byte groupId = -1;
  /** The group id of the RS we are connected to. */
  private byte rsGroupId = -1;
  /** The server id of the RS we are connected to. */
  private Integer rsServerId = -1;
  // The server URL of the RS we are connected to
  /** The server URL of the RS we are connected to. */
  private String rsServerUrl = null;
  // Our replication domain
  /** Our replication domain. */
  private ReplicationDomain domain = null;
  /**
   * This object is used as a conditional event to be notified about
@@ -182,7 +183,7 @@
   * received, it is incremented. When it reaches 2, we run the checking
   * algorithm to see if we must reconnect to another best replication server.
   * Then we reset the value to 0. But when a topology message is received, the
   * integer is reseted to 0. This insures that we wait at least one monitoring
   * integer is reseted to 0. This ensures that we wait at least one monitoring
   * publisher period before running the algorithm, but also that we wait at
   * least for a monitoring period after the last received topology message
   * (topology stabilization).
@@ -247,19 +248,17 @@
  /**
   * Start the ReplicationBroker.
   *
   * @param servers list of servers used
   * @param replicationServers list of servers used
   */
  public void start(Collection<String> servers)
  public void start(Collection<String> replicationServers)
  {
    synchronized (startStopLock)
    {
      /*
       * Open Socket to the ReplicationServer Send the Start message
       */
      // Open Socket to the ReplicationServer Send the Start message
      shutdown = false;
      this.servers = servers;
      this.replicationServerUrls = replicationServers;
      if (servers.size() < 1)
      if (this.replicationServerUrls.size() < 1)
      {
        Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
        logError(message);
@@ -348,7 +347,7 @@
      replicationServerInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : servers)
    for (String serverUrl : replicationServerUrls)
    {
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
@@ -428,12 +427,10 @@
    }
    // Now compare addresses, if at least one match, this is the same server
    for (int i = 0; i < rs1Addresses.length; i++)
    for (InetAddress inetAddress1 : rs1Addresses)
    {
      InetAddress inetAddress1 = rs1Addresses[i];
      for (int j = 0; j < rs2Addresses.length; j++)
      for (InetAddress inetAddress2 : rs2Addresses)
      {
        InetAddress inetAddress2 = rs2Addresses[j];
        if (inetAddress2.equals(inetAddress1))
        {
          return true;
@@ -454,7 +451,7 @@
  {
    private short protocolVersion;
    private long generationId;
    private byte groupId = (byte) -1;
    private byte groupId = -1;
    private int serverId;
    // Received server URL
    private String serverURL;
@@ -517,8 +514,11 @@
    }
    /**
     * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
     * Constructs a ReplicationServerInfo object wrapping a
     * {@link ReplServerStartMsg}.
     *
     * @param replServerStartMsg
     *          The {@link ReplServerStartMsg} this object will wrap.
     */
    private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
    {
@@ -537,9 +537,10 @@
    /**
     * Constructs a ReplicationServerInfo object wrapping a
     * ReplServerStartDSMsg.
     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
     * wrap.
     * {@link ReplServerStartDSMsg}.
     *
     * @param replServerStartDSMsg
     *          The {@link ReplServerStartDSMsg} this object will wrap.
     */
    private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    {
@@ -756,9 +757,11 @@
     * Returns a string representation of this object.
     * @return A string representation of this object.
     */
    @Override
    public String toString()
    {
      return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
      return "Url:" + this.serverURL + " ServerId:" + this.serverId
          + " GroupId:" + this.groupId;
    }
  }
@@ -781,15 +784,14 @@
   */
  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
  {
    Map<Integer, ReplicationServerInfo> rsInfos =
      new ConcurrentHashMap<Integer, ReplicationServerInfo>();
    for (String server : servers)
    for (String serverUrl : replicationServerUrls)
    {
      // Connect to server and get info about it
      ReplicationServerInfo replicationServerInfo =
        performPhaseOneHandshake(server, false, false);
        performPhaseOneHandshake(serverUrl, false, false);
      // Store server info in list
      if (replicationServerInfo != null)
@@ -802,20 +804,27 @@
  }
  /**
   * Special aspects of connecting as ECL compared to connecting as data server
   * are :
   * - 1 single RS configured
   * - so no choice of the preferred RS
   * - ?? Heartbeat
   * - Start handshake is :
   * Special aspects of connecting as ECL (External Change Log) compared to
   * connecting as data server are :
   * <ul>
   * <li>1 single RS configured</li>
   * <li>so no choice of the preferred RS</li>
   * <li>?? Heartbeat</li>
   * <li>Start handshake is :
   *
   * <pre>
   *    Broker ---> StartECLMsg       ---> RS
   *          <---- ReplServerStartMsg ---
   *           ---> StartSessionECLMsg --> RS
   * </pre>
   *
   * </li>
   * </ul>
   */
  private void connectAsECL()
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    String bestServer = this.replicationServerUrls.iterator().next();
    if (performPhaseOneHandshake(bestServer, true, true) != null)
    {
@@ -1323,17 +1332,7 @@
          localSession.close();
        }
        if (socket != null)
        {
          try
          {
            socket.close();
          }
          catch (IOException e)
          {
            // Ignore.
          }
        }
        close(socket);
      }
      if (!hasConnected && errorMessage != null)
@@ -1361,9 +1360,9 @@
  /**
   * Performs the second phase handshake (send StartSessionMsg and receive
   * TopologyMsg messages exchange) and return the reply message from the
   * replication server.
   * Performs the second phase handshake for External Change Log (send
   * StartSessionMsg and receive TopologyMsg messages exchange) and return the
   * reply message from the replication server.
   *
   * @param server Server we are connecting with.
   * @param initStatus The status we are starting with
@@ -1535,58 +1534,34 @@
     * - replication server in the same VM as local DS one
     */
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    Map<Integer, ReplicationServerInfo> newBestServers;
    // The list of best replication servers is filtered with each criteria. At
    // each criteria, the list is replaced with the filtered one if some there
    // each criteria, the list is replaced with the filtered one if there
    // are some servers from the filtering, otherwise, the list is left as is
    // and the new filtering for the next criteria is applied and so on.
    for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
    // Use only servers locally configured: those are servers declared in
    // the local configuration. When the current method is called, for
    // sure, at least one server from the list is locally configured
    bestServers =
        keepBest(filterServersLocallyConfigured(bestServers), bestServers);
    // Some servers with same group id ?
    bestServers =
        keepBest(filterServersWithSameGroupId(bestServers, groupId),
            bestServers);
    // Some servers with same generation id ?
    Map<Integer, ReplicationServerInfo> sameGenerationId =
        filterServersWithSameGenerationId(bestServers, generationId);
    if (sameGenerationId.size() > 0)
    {
      newBestServers = null;
      switch (filterLevel)
      {
        case 1:
          // Use only servers locally configured: those are servers declared in
          // the local configuration. When the current method is called, for
          // sure, at least one server from the list is locally configured
          bestServers = filterServersLocallyConfigured(bestServers);
          break;
        case 2:
          // Some servers with same group id ?
          newBestServers = filterServersWithSameGroupId(bestServers, groupId);
          if (newBestServers.size() > 0)
          {
            bestServers = newBestServers;
          }
          break;
        case 3:
          // Some servers with same generation id ?
          newBestServers = filterServersWithSameGenerationId(bestServers,
            generationId);
          if (newBestServers.size() > 0)
          {
            // Ok some servers with the right generation id
            bestServers = newBestServers;
            // If some servers with the right generation id this is useful to
            // run the local DS change criteria
            newBestServers = filterServersWithAllLocalDSChanges(bestServers,
              myState, localServerId);
            if (newBestServers.size() > 0)
            {
              bestServers = newBestServers;
            }
          }
          break;
        case 4:
          // Some servers in the local VM ?
          newBestServers = filterServersInSameVM(bestServers);
          if (newBestServers.size() > 0)
          {
            bestServers = newBestServers;
          }
          break;
      }
      // If some servers with the right generation id this is useful to
      // run the local DS change criteria
      bestServers =
          keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
              myState, localServerId), sameGenerationId);
    }
    // Some servers in the local VM ?
    bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
    /**
     * Now apply the choice base on the weight to the best servers list
@@ -1612,6 +1587,23 @@
  }
  /**
   * If the filtered Map is not empty then it is returned, else return the
   * original unfiltered Map.
   *
   * @return the best fit Map between the filtered Map and the original
   * unfiltered Map.
   */
  private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
      Map<K, V> unfilteredMap)
  {
    if (!filteredMap.isEmpty())
    {
      return filteredMap;
    }
    return unfilteredMap;
  }
  /**
   * Creates a new list that contains only replication servers that are locally
   * configured.
   * @param bestServers The list of replication servers to filter
@@ -1867,24 +1859,22 @@
    // key:server id, value: distance
    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
    // Precision for the operations (number of digits after the dot)
    // Default value of rounding method is HALF_UP for
    // the MathContext
    MathContext mathContext = new MathContext(32);
    MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      int rsWeight = replicationServerInfo.getWeight();
      //  load goal = rs weight / sum of weights
      BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
        new BigDecimal(sumOfWeights), mathContext);
      BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide(
        BigDecimal.valueOf(sumOfWeights), mathContext);
      BigDecimal currentLoadBd = BigDecimal.ZERO;
      if (sumOfConnectedDSs != 0)
      {
        // current load = number of connected DSs / total number of DSs
        int connectedDSs = replicationServerInfo.getConnectedDSNumber();
        currentLoadBd = (new BigDecimal(connectedDSs)).divide(
        new BigDecimal(sumOfConnectedDSs), mathContext);
        currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
          BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
      }
      // load distance = load goal - current load
      BigDecimal loadDistanceBd =
@@ -1916,7 +1906,7 @@
          bestRsId = rsId;
          highestDistance = loadDistance;
        }
        if (loadDistance != (float)0)
        if (loadDistance != 0)
        {
          allRsWithZeroDistance = false;
        }
@@ -1931,7 +1921,7 @@
      // All servers with a 0 distance ?
      if (allRsWithZeroDistance)
      {
        // Choose server withe the highest weight
        // Choose server with the highest weight
        bestRsId = highestWeightRsId;
      }
      return bestServers.get(bestRsId);
@@ -1942,7 +1932,7 @@
      float currentLoadDistance =
        loadDistances.get(currentRsServerId).floatValue();
      if (currentLoadDistance < (float) 0)
      if (currentLoadDistance < 0)
      {
        // Too much DSs connected to the current RS, compared with its load
        // goal:
@@ -1960,7 +1950,7 @@
          }
        }
        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
        {
          // The average distance of the other RSs shows a lack of DSs.
          // Compute the number of DSs to disconnect from the current RS,
@@ -1977,8 +1967,8 @@
          // current situation, otherwise the DS would keep move between the 2
          // RSs
          float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
            multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
            floatValue();
            multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
                .floatValue();
          int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
          // Avoid yoyo effect
@@ -1990,34 +1980,34 @@
              bestServers.get(currentRsServerId);
            int currentRsWeight = currentReplicationServerInfo.getWeight();
            BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
            BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
            BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
            BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
            BigDecimal currentRsLoadGoalBd =
              currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
            BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
            BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
            if (sumOfConnectedDSs != 0)
            {
              int connectedDSs = currentReplicationServerInfo.
                getConnectedDSNumber();
              BigDecimal potentialNewConnectedDSsBd =
                new BigDecimal(connectedDSs - 1);
                  BigDecimal.valueOf(connectedDSs - 1);
              BigDecimal sumOfConnectedDSsBd =
                new BigDecimal(sumOfConnectedDSs);
                  BigDecimal.valueOf(sumOfConnectedDSs);
              potentialCurrentRsNewLoadBd =
                potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
                mathContext);
                  mathContext);
            }
            BigDecimal potentialCurrentRsNewLoadDistanceBd =
              currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
              mathContext);
                mathContext);
            // What would be the new load distance for the other RSs ?
            BigDecimal additionalDsLoadBd =
              (new BigDecimal(1)).divide(
              new BigDecimal(sumOfConnectedDSs), mathContext);
                BigDecimal.ONE.divide(
                  BigDecimal.valueOf(sumOfConnectedDSs),mathContext);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
              sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
              mathContext);
                    mathContext);
            // Now compare both values: we must no disconnect the DS if this
            // is for going in a situation where the load distance of the other
@@ -2139,7 +2129,7 @@
    if (failingSession == session)
    {
      connected = false;
      rsGroupId = (byte) -1;
      rsGroupId = -1;
      rsServerId = -1;
      rsServerUrl = null;
      session = null;
@@ -2291,8 +2281,7 @@
          // connectPhaseLock because it can be blocking and we don't
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
            (long) 500, TimeUnit.MILLISECONDS);
            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
        } else
        {
          credit = true;
@@ -2466,14 +2455,12 @@
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          // Extract and store replicas ServerStates
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          for (int srvId : toIterable(monitorMsg.ldapIterator()))
          {
            int srvId = it.next();
            replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
          }
@@ -2485,10 +2472,8 @@
          }
          // Update the replication servers ServerStates with new received info
          it = monitorMsg.rsIterator();
          while (it.hasNext())
          for (int srvId : toIterable(monitorMsg.rsIterator()))
          {
            int srvId = it.next();
            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
            if (rsInfo != null)
            {
@@ -2662,7 +2647,7 @@
      stopRSHeartBeatMonitoring();
      stopChangeTimeHeartBeatPublishing();
      replicationServer = "stopped";
      rsGroupId = (byte) -1;
      rsGroupId = -1;
      rsServerId = -1;
      rsServerUrl = null;
      if (session != null)
@@ -2781,17 +2766,17 @@
    // A new session is necessary only when information regarding
    // the connection is modified
    if ((servers == null) ||
      (!(replicationServers.size() == servers.size() && replicationServers.
      containsAll(servers))) ||
      window != this.maxRcvWindow ||
      heartbeatInterval != this.heartbeatInterval ||
      (groupId != this.groupId))
    if (this.replicationServerUrls == null
        || replicationServers.size() != this.replicationServerUrls.size()
        || !replicationServers.containsAll(this.replicationServerUrls)
        || window != this.maxRcvWindow
        || heartbeatInterval != this.heartbeatInterval
        || groupId != this.groupId)
    {
      needToRestartSession = true;
    }
    this.servers = replicationServers;
    this.replicationServerUrls = replicationServers;
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
opends/src/server/org/opends/server/util/StaticUtils.java
@@ -27,21 +27,41 @@
 */
package org.opends.server.util;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
import java.io.*;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
import java.util.StringTokenizer;
import java.util.TimeZone;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -66,6 +86,11 @@
import org.opends.server.util.args.Argument;
import org.opends.server.util.args.ArgumentException;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
/**
 * This class defines a number of static utility methods that may be used
@@ -4612,5 +4637,60 @@
      }
    }
  }
  /**
   * Returns an {@link Iterable} returning the passed in {@link Iterator}. THis
   * allows using methods returning Iterators with foreach statements.
   * <p>
   * For example, consider a method with this signature:
   * <p>
   * <code>public Iterator&lt;String&gt; myIteratorMethod();</code>
   * <p>
   * Classical use with for or while loop:
   *
   * <pre>
   * for (Iterator&lt;String&gt; it = myIteratorMethod(); it.hasNext();)
   * {
   *   String s = it.next();
   *   // use it
   * }
   *
   * Iterator&lt;String&gt; it = myIteratorMethod();
   * while(it.hasNext();)
   * {
   *   String s = it.next();
   *   // use it
   * }
   * </pre>
   *
   * Improved use with foreach:
   *
   * <pre>
   * for (String s : StaticUtils.toIterable(myIteratorMethod()))
   * {
   * }
   * </pre>
   *
   * </p>
   *
   * @param <T>
   *          the generic type of the passed in Iterator and for the returned
   *          Iterable.
   * @param iterator
   *          the Iterator that will be returned by the Iterable.
   * @return an Iterable returning the passed in Iterator
   */
  public static <T> Iterable<T> toIterable(final Iterator<T> iterator)
  {
    return new Iterable<T>()
    {
      @Override
      public Iterator<T> iterator()
      {
        return iterator;
      }
    };
  }
}