From 3e4eefdcd60043206832b3bd5a00409810c6d9b6 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 30 May 2013 16:38:37 +0000
Subject: [PATCH] Partial fix for OPENDJ-875: Use of hostnames in replication protocol causes failover problems

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java  |   49 +++++-----------
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  107 ++++++++++++++++++++++-------------
 2 files changed, 83 insertions(+), 73 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 0e75c5e..aa280d0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -165,6 +165,7 @@
   private ECLWorkflowElement eclwe;
   private WorkflowImpl externalChangeLogWorkflowImpl = null;
 
+  // FIXME: why is this a set of ports? Do we claim to support multiple ports?
   private static HashSet<Integer> localPorts = new HashSet<Integer>();
 
   // Monitors for synchronizing domain creation with the connect thread.
@@ -245,7 +246,7 @@
     monitoringPublisherPeriod = configuration.getMonitoringPeriod();
 
     replSessionSecurity = new ReplSessionSecurity();
-    initialize(replicationPort);
+    initialize();
     configuration.addChangeListener(this);
     try
     {
@@ -521,12 +522,8 @@
 
   /**
    * initialization function for the replicationServer.
-   *
-   * @param  changelogPort     The port on which the replicationServer should
-   *                           listen.
-   *
    */
-  private void initialize(int changelogPort)
+  private void initialize()
   {
     shutdown = false;
 
@@ -542,9 +539,9 @@
        * Open replicationServer socket
        */
       String localhostname = InetAddress.getLocalHost().getHostName();
-      serverURL = localhostname + ":" + String.valueOf(changelogPort);
+      serverURL = localhostname + ":" + String.valueOf(replicationPort);
       listenSocket = new ServerSocket();
-      listenSocket.bind(new InetSocketAddress(changelogPort));
+      listenSocket.bind(new InetSocketAddress(replicationPort));
 
       /*
        * creates working threads
@@ -593,7 +590,7 @@
     } catch (IOException e)
     {
       Message message =
-          ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
+          ERR_COULD_NOT_BIND_CHANGELOG.get(replicationPort, e.getMessage());
       logError(message);
     } catch (DirectoryException e)
     {
@@ -1339,7 +1336,7 @@
                                 boolean successful)
   {
     if (backend.getBackendID().equals(backendId))
-      initialize(this.replicationPort);
+      initialize();
   }
 
   /**
@@ -1602,33 +1599,17 @@
   }
 
   /**
-   * This method allows to check if the Replication Server given
-   * as the parameter is running in the local JVM.
+   * Returns {@code true} if the provided port is one of the ports that this
+   * replication server is listening on.
    *
-   * @param server   The Replication Server that should be checked.
-   *
-   * @return         a boolean indicating if the Replication Server given
-   *                 as the parameter is running in the local JVM.
+   * @param port
+   *          The port to be checked.
+   * @return {@code true} if the provided port is one of the ports that this
+   *         replication server is listening on.
    */
-  public static boolean isLocalReplicationServer(String server)
+  public static boolean isLocalReplicationServerPort(int port)
   {
-    int separator = server.lastIndexOf(':');
-    if (separator == -1)
-      return false;
-    int port = Integer.parseInt(server.substring(separator + 1));
-    String hostname = server.substring(0, separator);
-    try
-    {
-      InetAddress localAddr = InetAddress.getLocalHost();
-
-      return localPorts.contains(port)
-          && (InetAddress.getByName(hostname).isLoopbackAddress() ||
-          InetAddress.getByName(hostname).equals(localAddr));
-
-    } catch (UnknownHostException e)
-    {
-      return false;
-    }
+    return localPorts.contains(port);
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 6d4c018..bc1b1de 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.replication.server.ReplicationServer.*;
 import static org.opends.server.util.StaticUtils.*;
 
 import java.io.IOException;
@@ -67,7 +68,6 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.ServerConstants;
 
@@ -372,7 +372,7 @@
    * replication server instance, false otherwise.
    */
   private static boolean isSameReplicationServerUrl(String rs1Url,
-    String rs2Url)
+      String rs2Url)
   {
     // Get and compare ports of RS1 and RS2
     int separator1 = rs1Url.lastIndexOf(':');
@@ -397,46 +397,54 @@
     }
 
     // Get and compare addresses of RS1 and RS2
-    String rs1 = rs1Url.substring(0, separator1);
-    InetAddress[] rs1Addresses;
+    final String rs1 = rs1Url.substring(0, separator1);
+    final InetAddress[] rs1Addresses;
     try
     {
-      if (isLocalAddress(rs1))
-      {
-        // Replace localhost with the local official hostname
-        rs1 = InetAddress.getLocalHost().getHostName();
-      }
-      rs1Addresses = InetAddress.getAllByName(rs1);
-    } catch (UnknownHostException ex)
+      // Normalize local address to null.
+      rs1Addresses = isLocalAddress(rs1) ? null : InetAddress.getAllByName(rs1);
+    }
+    catch (UnknownHostException ex)
     {
       // Unknown RS: should not happen
       return false;
     }
 
-    String rs2 = rs2Url.substring(0, separator2);
-    InetAddress[] rs2Addresses;
+    final String rs2 = rs2Url.substring(0, separator2);
+    final InetAddress[] rs2Addresses;
     try
     {
-      if (isLocalAddress(rs1))
-      {
-        // Replace localhost with the local official hostname
-        rs2 = InetAddress.getLocalHost().getHostName();
-      }
-      rs2Addresses = InetAddress.getAllByName(rs2);
-    } catch (UnknownHostException ex)
+      // Normalize local address to null.
+      rs2Addresses = isLocalAddress(rs2) ? null : InetAddress.getAllByName(rs2);
+    }
+    catch (UnknownHostException ex)
     {
       // Unknown RS: should not happen
       return false;
     }
 
-    // Now compare addresses, if at least one match, this is the same server
-    for (InetAddress inetAddress1 : rs1Addresses)
+    // Now compare addresses, if at least one match, this is the same server.
+    if (rs1Addresses == null && rs2Addresses == null)
     {
-      for (InetAddress inetAddress2 : rs2Addresses)
+      // Both local addresses.
+      return true;
+    }
+    else if (rs1Addresses == null || rs2Addresses == null)
+    {
+      // One local address and one non-local.
+      return false;
+    }
+    else
+    {
+      // Both non-local addresses: check for overlap.
+      for (InetAddress inetAddress1 : rs1Addresses)
       {
-        if (inetAddress2.equals(inetAddress1))
+        for (InetAddress inetAddress2 : rs2Addresses)
         {
-          return true;
+          if (inetAddress2.equals(inetAddress1))
+          {
+            return true;
+          }
         }
       }
     }
@@ -1564,8 +1572,8 @@
           keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
               myState, localServerId), sameGenerationId);
     }
-    // Some servers in the local VM ?
-    bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
+    // Some servers in the local VM or local host?
+    bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
 
     /**
      * Now apply the choice base on the weight to the best servers list
@@ -1795,26 +1803,47 @@
     }
   }
 
+
+
   /**
-   * Creates a new list that contains only replication servers that are in the
-   * same VM as the local DS, from a passed replication server list.
-   * @param bestServers The list of replication servers to filter
-   * @return The sub list of replication servers being in the same VM as the
-   * local DS (which may be empty)
+   * Creates a new list that contains only replication servers that are on the
+   * same host as the local DS, from a passed replication server list. This
+   * method will gives priority to any replication server which is in the same
+   * VM as this DS.
+   *
+   * @param bestServers
+   *          The list of replication servers to filter
+   * @return The sub list of replication servers being on the same host as the
+   *         local DS (which may be empty)
    */
-  private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
-    Map<Integer, ReplicationServerInfo> bestServers)
+  private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
+      Map<Integer, ReplicationServerInfo> bestServers)
   {
     Map<Integer, ReplicationServerInfo> result =
-      new HashMap<Integer, ReplicationServerInfo>();
-
+        new HashMap<Integer, ReplicationServerInfo>();
     for (Integer rsId : bestServers.keySet())
     {
       ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-      if (ReplicationServer.isLocalReplicationServer(
-        replicationServerInfo.getServerURL()))
+      String server = replicationServerInfo.getServerURL();
+      int separator = server.lastIndexOf(':');
+      if (separator > 0)
       {
-        result.put(rsId, replicationServerInfo);
+        String hostname = server.substring(0, separator);
+        if (isLocalAddress(hostname))
+        {
+          int port = Integer.parseInt(server.substring(separator + 1));
+          if (isLocalReplicationServerPort(port))
+          {
+            // An RS in the same VM will always have priority.
+            result.clear();
+            result.put(rsId, replicationServerInfo);
+            break;
+          }
+          else
+          {
+            result.put(rsId, replicationServerInfo);
+          }
+        }
       }
     }
     return result;

--
Gitblit v1.10.0