mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
30.38.2013 d0ec790931dcb0ad6ff6059cf7196ec3dc5b7921
Partial fix for OPENDJ-875: Use of hostnames in replication protocol causes failover problems

This partial fix addresses the first two bugs described in the issue comments, specifically:

* the logic (ReplicationBroker#filterServersInSameVM) which chooses the best RS does not prefer RS which are on the same host, only those which are in the same VM. In addition, the logic in ReplicationServer#isLocalReplicationServerPort assumes that the local host name is the same address which the RS is listening on

* the method ReplicationBroker#isSameReplicationServerUrl contains a typo which causes the second url to be interpreted as the local host name if the first url is a local address. This triggers all kinds of strange side effects during RS fail-over.
2 files modified
156 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 49 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 107 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -165,6 +165,7 @@
  private ECLWorkflowElement eclwe;
  private WorkflowImpl externalChangeLogWorkflowImpl = null;
  // FIXME: why is this a set of ports? Do we claim to support multiple ports?
  private static HashSet<Integer> localPorts = new HashSet<Integer>();
  // Monitors for synchronizing domain creation with the connect thread.
@@ -245,7 +246,7 @@
    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
    replSessionSecurity = new ReplSessionSecurity();
    initialize(replicationPort);
    initialize();
    configuration.addChangeListener(this);
    try
    {
@@ -521,12 +522,8 @@
  /**
   * initialization function for the replicationServer.
   *
   * @param  changelogPort     The port on which the replicationServer should
   *                           listen.
   *
   */
  private void initialize(int changelogPort)
  private void initialize()
  {
    shutdown = false;
@@ -542,9 +539,9 @@
       * Open replicationServer socket
       */
      String localhostname = InetAddress.getLocalHost().getHostName();
      serverURL = localhostname + ":" + String.valueOf(changelogPort);
      serverURL = localhostname + ":" + String.valueOf(replicationPort);
      listenSocket = new ServerSocket();
      listenSocket.bind(new InetSocketAddress(changelogPort));
      listenSocket.bind(new InetSocketAddress(replicationPort));
      /*
       * creates working threads
@@ -593,7 +590,7 @@
    } catch (IOException e)
    {
      Message message =
          ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
          ERR_COULD_NOT_BIND_CHANGELOG.get(replicationPort, e.getMessage());
      logError(message);
    } catch (DirectoryException e)
    {
@@ -1339,7 +1336,7 @@
                                boolean successful)
  {
    if (backend.getBackendID().equals(backendId))
      initialize(this.replicationPort);
      initialize();
  }
  /**
@@ -1602,33 +1599,17 @@
  }
  /**
   * This method allows to check if the Replication Server given
   * as the parameter is running in the local JVM.
   * Returns {@code true} if the provided port is one of the ports that this
   * replication server is listening on.
   *
   * @param server   The Replication Server that should be checked.
   *
   * @return         a boolean indicating if the Replication Server given
   *                 as the parameter is running in the local JVM.
   * @param port
   *          The port to be checked.
   * @return {@code true} if the provided port is one of the ports that this
   *         replication server is listening on.
   */
  public static boolean isLocalReplicationServer(String server)
  public static boolean isLocalReplicationServerPort(int port)
  {
    int separator = server.lastIndexOf(':');
    if (separator == -1)
      return false;
    int port = Integer.parseInt(server.substring(separator + 1));
    String hostname = server.substring(0, separator);
    try
    {
      InetAddress localAddr = InetAddress.getLocalHost();
      return localPorts.contains(port)
          && (InetAddress.getByName(hostname).isLoopbackAddress() ||
          InetAddress.getByName(hostname).equals(localAddr));
    } catch (UnknownHostException e)
    {
      return false;
    }
    return localPorts.contains(port);
  }
  /**
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.server.ReplicationServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
@@ -67,7 +68,6 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
@@ -372,7 +372,7 @@
   * replication server instance, false otherwise.
   */
  private static boolean isSameReplicationServerUrl(String rs1Url,
    String rs2Url)
      String rs2Url)
  {
    // Get and compare ports of RS1 and RS2
    int separator1 = rs1Url.lastIndexOf(':');
@@ -397,46 +397,54 @@
    }
    // Get and compare addresses of RS1 and RS2
    String rs1 = rs1Url.substring(0, separator1);
    InetAddress[] rs1Addresses;
    final String rs1 = rs1Url.substring(0, separator1);
    final InetAddress[] rs1Addresses;
    try
    {
      if (isLocalAddress(rs1))
      {
        // Replace localhost with the local official hostname
        rs1 = InetAddress.getLocalHost().getHostName();
      }
      rs1Addresses = InetAddress.getAllByName(rs1);
    } catch (UnknownHostException ex)
      // Normalize local address to null.
      rs1Addresses = isLocalAddress(rs1) ? null : InetAddress.getAllByName(rs1);
    }
    catch (UnknownHostException ex)
    {
      // Unknown RS: should not happen
      return false;
    }
    String rs2 = rs2Url.substring(0, separator2);
    InetAddress[] rs2Addresses;
    final String rs2 = rs2Url.substring(0, separator2);
    final InetAddress[] rs2Addresses;
    try
    {
      if (isLocalAddress(rs1))
      {
        // Replace localhost with the local official hostname
        rs2 = InetAddress.getLocalHost().getHostName();
      }
      rs2Addresses = InetAddress.getAllByName(rs2);
    } catch (UnknownHostException ex)
      // Normalize local address to null.
      rs2Addresses = isLocalAddress(rs2) ? null : InetAddress.getAllByName(rs2);
    }
    catch (UnknownHostException ex)
    {
      // Unknown RS: should not happen
      return false;
    }
    // Now compare addresses, if at least one match, this is the same server
    for (InetAddress inetAddress1 : rs1Addresses)
    // Now compare addresses, if at least one match, this is the same server.
    if (rs1Addresses == null && rs2Addresses == null)
    {
      for (InetAddress inetAddress2 : rs2Addresses)
      // Both local addresses.
      return true;
    }
    else if (rs1Addresses == null || rs2Addresses == null)
    {
      // One local address and one non-local.
      return false;
    }
    else
    {
      // Both non-local addresses: check for overlap.
      for (InetAddress inetAddress1 : rs1Addresses)
      {
        if (inetAddress2.equals(inetAddress1))
        for (InetAddress inetAddress2 : rs2Addresses)
        {
          return true;
          if (inetAddress2.equals(inetAddress1))
          {
            return true;
          }
        }
      }
    }
@@ -1564,8 +1572,8 @@
          keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
              myState, localServerId), sameGenerationId);
    }
    // Some servers in the local VM ?
    bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
    // Some servers in the local VM or local host?
    bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
    /**
     * Now apply the choice base on the weight to the best servers list
@@ -1795,26 +1803,47 @@
    }
  }
  /**
   * Creates a new list that contains only replication servers that are in the
   * same VM as the local DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers being in the same VM as the
   * local DS (which may be empty)
   * Creates a new list that contains only replication servers that are on the
   * same host as the local DS, from a passed replication server list. This
   * method will gives priority to any replication server which is in the same
   * VM as this DS.
   *
   * @param bestServers
   *          The list of replication servers to filter
   * @return The sub list of replication servers being on the same host as the
   *         local DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
    Map<Integer, ReplicationServerInfo> bestServers)
  private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
      Map<Integer, ReplicationServerInfo> bestServers)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
        new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (ReplicationServer.isLocalReplicationServer(
        replicationServerInfo.getServerURL()))
      String server = replicationServerInfo.getServerURL();
      int separator = server.lastIndexOf(':');
      if (separator > 0)
      {
        result.put(rsId, replicationServerInfo);
        String hostname = server.substring(0, separator);
        if (isLocalAddress(hostname))
        {
          int port = Integer.parseInt(server.substring(separator + 1));
          if (isLocalReplicationServerPort(port))
          {
            // An RS in the same VM will always have priority.
            result.clear();
            result.put(rsId, replicationServerInfo);
            break;
          }
          else
          {
            result.put(rsId, replicationServerInfo);
          }
        }
      }
    }
    return result;