From 3d5b2a62fbad7b6fdd6a43a2f654d675c2de479a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 05 Jul 2011 16:11:32 +0000
Subject: [PATCH] Fix OPENDJ-244: Replication fails when replication server is configured for a network interface which is not an alias of localhost/127.0.0.1

---
 opends/src/server/org/opends/server/util/StaticUtils.java                      |   91 +++++++++++++++---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java  |  166 +++++++++++++++-----------------
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |    5 
 3 files changed, 158 insertions(+), 104 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c529d5a..1f87290 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -26,22 +26,22 @@
  *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
+
+
+
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.ServerConstants.EOL;
 import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.isLocalAddress;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
+import java.net.*;
 import java.util.*;
 
 import org.opends.messages.Category;
@@ -53,39 +53,15 @@
 import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.BackupTaskListener;
-import org.opends.server.api.ExportTaskListener;
-import org.opends.server.api.ImportTaskListener;
-import org.opends.server.api.RestoreTaskListener;
-import org.opends.server.api.VirtualAttributeProvider;
+import org.opends.server.api.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.WorkflowImpl;
 import org.opends.server.core.networkgroups.NetworkGroup;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.*;
-import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ServerStartECLMsg;
-import org.opends.server.replication.protocol.ServerStartMsg;
-import org.opends.server.replication.protocol.StartECLSessionMsg;
-import org.opends.server.replication.protocol.StartMsg;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.BackupConfig;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DN;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.RestoreConfig;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchFilter;
-import org.opends.server.types.VirtualAttributeRule;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
 import org.opends.server.util.LDIFReader;
 import org.opends.server.util.ServerConstants;
 import org.opends.server.util.TimeThread;
@@ -94,16 +70,14 @@
 import com.sleepycat.je.DatabaseException;
 import org.opends.server.types.SearchScope;
 
+
+
 /**
- * ReplicationServer Listener.
- *
- * This singleton is the main object of the replication server
- * It waits for the incoming connections and create listener
- * and publisher objects for
- * connection with LDAP servers and with replication servers
- *
- * It is responsible for creating the replication server replicationServerDomain
- * and managing it
+ * ReplicationServer Listener. This singleton is the main object of the
+ * replication server It waits for the incoming connections and create listener
+ * and publisher objects for connection with LDAP servers and with replication
+ * servers It is responsible for creating the replication server
+ * replicationServerDomain and managing it
  */
 public final class ReplicationServer
   implements ConfigurationChangeListener<ReplicationServerCfg>,
@@ -126,7 +100,6 @@
   private final Map<String, ReplicationServerDomain> baseDNs =
           new HashMap<String, ReplicationServerDomain>();
 
-  private String localURL = "null";
   private volatile boolean shutdown = false;
   private ReplicationDbEnv dbEnv;
   private int rcvWindow;
@@ -413,12 +386,17 @@
       while (!shutdown)
       {
         /*
-         * periodically check that we are connected to all other replication
+         * Periodically check that we are connected to all other replication
          * servers and if not establish the connection
          */
         for (ReplicationServerDomain domain : getReplicationServerDomains())
         {
-          Set<String> connectedReplServers = domain.getChangelogs();
+          // Create a normalized set of server URLs.
+          final Set<String> connectedReplServers = new HashSet<String>();
+          for (String url : domain.getChangelogs())
+          {
+            connectedReplServers.add(normalizeServerURL(url));
+          }
 
           /*
            * check that all replication server in the config are in the
@@ -426,51 +404,38 @@
            */
           for (String serverURL : replicationServers)
           {
-            int separator = serverURL.lastIndexOf(':');
-            String port = serverURL.substring(separator + 1);
-            String hostname = serverURL.substring(0, separator);
-
+            final int separator = serverURL.lastIndexOf(':');
+            final String portString = serverURL.substring(separator + 1);
+            final int port = Integer.parseInt(portString);
+            final String hostname = serverURL.substring(0, separator);
+            final InetAddress inetAddress;
             try
             {
-              InetAddress inetAddress = InetAddress
-                  .getByName(hostname);
-              String serverAddress = inetAddress.getHostAddress()
-                  + ":" + port;
-              String alternServerAddress = null;
-
-              if (hostname.equalsIgnoreCase("localhost"))
-              {
-                // if "localhost" was used as the hostname in the configuration
-                // also check is the connection is already opened with the
-                // local address.
-                alternServerAddress = InetAddress.getLocalHost()
-                    .getHostAddress() + ":" + port;
-              }
-
-              if (inetAddress.equals(InetAddress.getLocalHost()))
-              {
-                // if the host address is the local one, also check
-                // if the connection is already opened with the "localhost"
-                // address
-                alternServerAddress = "127.0.0.1" + ":" + port;
-              }
-
-              if ((serverAddress.compareTo("127.0.0.1:"
-                  + replicationPort) != 0)
-                  && (serverAddress.compareTo(this.localURL) != 0)
-                  && (!connectedReplServers.contains(serverAddress)
-                      && ((alternServerAddress == null) || !connectedReplServers
-                      .contains(alternServerAddress))))
-              {
-                connect(serverURL, domain.getBaseDn());
-              }
+              inetAddress = InetAddress.getByName(hostname);
             }
-            catch (IOException e)
+            catch (UnknownHostException e)
             {
-              Message message = ERR_COULD_NOT_SOLVE_HOSTNAME
-                  .get(hostname);
+              // If the host name cannot be resolved then no chance of
+              // connecting anyway.
+              Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
               logError(message);
+              continue;
             }
+
+            // Avoid connecting to self.
+            if (isLocalAddress(inetAddress) && (port == replicationPort))
+            {
+              continue;
+            }
+
+            // Don't connect to a server if it is already connected.
+            final String normalizedServerURL = normalizeServerURL(serverURL);
+            if (connectedReplServers.contains(normalizedServerURL))
+            {
+              continue;
+            }
+
+            connect(serverURL, domain.getBaseDn());
           }
         }
 
@@ -574,9 +539,7 @@
        * Open replicationServer socket
        */
       String localhostname = InetAddress.getLocalHost().getHostName();
-      String localAdddress = InetAddress.getLocalHost().getHostAddress();
       serverURL = localhostname + ":" + String.valueOf(changelogPort);
-      localURL = localAdddress + ":" + String.valueOf(changelogPort);
       listenSocket = new ServerSocket();
       listenSocket.bind(new InetSocketAddress(changelogPort));
 
@@ -1074,9 +1037,7 @@
 
         replicationPort = newPort;
         String localhostname = InetAddress.getLocalHost().getHostName();
-        String localAdddress = InetAddress.getLocalHost().getHostAddress();
         serverURL = localhostname + ":" + String.valueOf(replicationPort);
-        localURL = localAdddress + ":" + String.valueOf(replicationPort);
         listenSocket = new ServerSocket();
         listenSocket.bind(new InetSocketAddress(replicationPort));
 
@@ -2047,4 +2008,35 @@
     return dbDirname;
   }
 
+
+
+  private String normalizeServerURL(final String url)
+  {
+    final int separator = url.lastIndexOf(':');
+    final String portString = url.substring(separator + 1);
+    final String hostname = url.substring(0, separator);
+    try
+    {
+      final InetAddress inetAddress = InetAddress.getByName(hostname);
+
+      if (isLocalAddress(inetAddress))
+      {
+        // It doesn't matter whether we use an IP or hostname here.
+        return InetAddress.getLocalHost().getHostAddress() + ":" + portString;
+      }
+      else
+      {
+        return inetAddress.getHostAddress() + ":" + portString;
+      }
+    }
+    catch (UnknownHostException e)
+    {
+      // This should not happen, but if it does then just default to the
+      // original URL.
+      Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
+      logError(message);
+
+      return url;
+    }
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 860c365..0ed1b10 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -32,6 +32,7 @@
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.collectionToString;
+import static org.opends.server.util.StaticUtils.isLocalAddress;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
@@ -390,7 +391,7 @@
     InetAddress[] rs1Addresses = null;
     try
     {
-      if (rs1.equals("localhost") || rs1.equals("127.0.0.1"))
+      if (isLocalAddress(rs1))
       {
         // Replace localhost with the local official hostname
         rs1 = InetAddress.getLocalHost().getHostName();
@@ -406,7 +407,7 @@
     InetAddress[] rs2Addresses = null;
     try
     {
-      if (rs2.equals("localhost") || rs2.equals("127.0.0.1"))
+      if (isLocalAddress(rs1))
       {
         // Replace localhost with the local official hostname
         rs2 = InetAddress.getLocalHost().getHostName();
diff --git a/opends/src/server/org/opends/server/util/StaticUtils.java b/opends/src/server/org/opends/server/util/StaticUtils.java
index 84ad567..26dbd27 100644
--- a/opends/src/server/org/opends/server/util/StaticUtils.java
+++ b/opends/src/server/org/opends/server/util/StaticUtils.java
@@ -34,27 +34,14 @@
 
 import java.io.*;
 import java.lang.reflect.InvocationTargetException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
+import java.net.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.RandomAccess;
-import java.util.StringTokenizer;
-import java.util.TimeZone;
+import java.util.*;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
@@ -4570,5 +4557,79 @@
       }
     }
   }
+
+
+
+  /**
+   * Returns {@code true} if the provided IPv4 or IPv6 address or host name
+   * represents the address of one of the interfaces on the current host
+   * machine.
+   *
+   * @param addressString
+   *          The IPv4 or IPv6 address or host name.
+   * @return {@code true} if the provided IPv4 or IPv6 address or host name
+   *         represents the address of one of the interfaces on the current host
+   *         machine.
+   */
+  public static boolean isLocalAddress(String addressString)
+  {
+    try
+    {
+      return isLocalAddress(InetAddress.getByName(addressString));
+    }
+    catch (UnknownHostException e)
+    {
+      return false;
+    }
+  }
+
+
+
+  /**
+   * Returns {@code true} if the provided {@code InetAddress} represents the
+   * address of one of the interfaces on the current host machine.
+   *
+   * @param address
+   *          The network address.
+   * @return {@code true} if the provided {@code InetAddress} represents the
+   *         address of one of the interfaces on the current host machine.
+   */
+  public static boolean isLocalAddress(InetAddress address)
+  {
+    if (address.isLoopbackAddress())
+    {
+      return true;
+    }
+    else
+    {
+      Enumeration<NetworkInterface> i;
+      try
+      {
+        i = NetworkInterface.getNetworkInterfaces();
+      }
+      catch (SocketException e)
+      {
+        // Unable to determine whether the address is local.
+        TRACER.debugCaught(DebugLogLevel.WARNING, e);
+        return false;
+      }
+
+      while (i.hasMoreElements())
+      {
+        NetworkInterface n = i.nextElement();
+        Enumeration<InetAddress> j = n.getInetAddresses();
+        while (j.hasMoreElements())
+        {
+          InetAddress localAddress = j.nextElement();
+          if (localAddress.equals(address))
+          {
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+  }
+
 }
 

--
Gitblit v1.10.0