From 3e4eefdcd60043206832b3bd5a00409810c6d9b6 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 30 May 2013 16:38:37 +0000
Subject: [PATCH] Partial fix for OPENDJ-875: Use of hostnames in replication protocol causes failover problems
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 49 +++++-----------
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 107 ++++++++++++++++++++++-------------
2 files changed, 83 insertions(+), 73 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 0e75c5e..aa280d0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -165,6 +165,7 @@
private ECLWorkflowElement eclwe;
private WorkflowImpl externalChangeLogWorkflowImpl = null;
+ // FIXME: why is this a set of ports? Do we claim to support multiple ports?
private static HashSet<Integer> localPorts = new HashSet<Integer>();
// Monitors for synchronizing domain creation with the connect thread.
@@ -245,7 +246,7 @@
monitoringPublisherPeriod = configuration.getMonitoringPeriod();
replSessionSecurity = new ReplSessionSecurity();
- initialize(replicationPort);
+ initialize();
configuration.addChangeListener(this);
try
{
@@ -521,12 +522,8 @@
/**
* initialization function for the replicationServer.
- *
- * @param changelogPort The port on which the replicationServer should
- * listen.
- *
*/
- private void initialize(int changelogPort)
+ private void initialize()
{
shutdown = false;
@@ -542,9 +539,9 @@
* Open replicationServer socket
*/
String localhostname = InetAddress.getLocalHost().getHostName();
- serverURL = localhostname + ":" + String.valueOf(changelogPort);
+ serverURL = localhostname + ":" + String.valueOf(replicationPort);
listenSocket = new ServerSocket();
- listenSocket.bind(new InetSocketAddress(changelogPort));
+ listenSocket.bind(new InetSocketAddress(replicationPort));
/*
* creates working threads
@@ -593,7 +590,7 @@
} catch (IOException e)
{
Message message =
- ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
+ ERR_COULD_NOT_BIND_CHANGELOG.get(replicationPort, e.getMessage());
logError(message);
} catch (DirectoryException e)
{
@@ -1339,7 +1336,7 @@
boolean successful)
{
if (backend.getBackendID().equals(backendId))
- initialize(this.replicationPort);
+ initialize();
}
/**
@@ -1602,33 +1599,17 @@
}
/**
- * This method allows to check if the Replication Server given
- * as the parameter is running in the local JVM.
+ * Returns {@code true} if the provided port is one of the ports that this
+ * replication server is listening on.
*
- * @param server The Replication Server that should be checked.
- *
- * @return a boolean indicating if the Replication Server given
- * as the parameter is running in the local JVM.
+ * @param port
+ * The port to be checked.
+ * @return {@code true} if the provided port is one of the ports that this
+ * replication server is listening on.
*/
- public static boolean isLocalReplicationServer(String server)
+ public static boolean isLocalReplicationServerPort(int port)
{
- int separator = server.lastIndexOf(':');
- if (separator == -1)
- return false;
- int port = Integer.parseInt(server.substring(separator + 1));
- String hostname = server.substring(0, separator);
- try
- {
- InetAddress localAddr = InetAddress.getLocalHost();
-
- return localPorts.contains(port)
- && (InetAddress.getByName(hostname).isLoopbackAddress() ||
- InetAddress.getByName(hostname).equals(localAddr));
-
- } catch (UnknownHostException e)
- {
- return false;
- }
+ return localPorts.contains(port);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 6d4c018..bc1b1de 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.replication.server.ReplicationServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
@@ -67,7 +68,6 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
@@ -372,7 +372,7 @@
* replication server instance, false otherwise.
*/
private static boolean isSameReplicationServerUrl(String rs1Url,
- String rs2Url)
+ String rs2Url)
{
// Get and compare ports of RS1 and RS2
int separator1 = rs1Url.lastIndexOf(':');
@@ -397,46 +397,54 @@
}
// Get and compare addresses of RS1 and RS2
- String rs1 = rs1Url.substring(0, separator1);
- InetAddress[] rs1Addresses;
+ final String rs1 = rs1Url.substring(0, separator1);
+ final InetAddress[] rs1Addresses;
try
{
- if (isLocalAddress(rs1))
- {
- // Replace localhost with the local official hostname
- rs1 = InetAddress.getLocalHost().getHostName();
- }
- rs1Addresses = InetAddress.getAllByName(rs1);
- } catch (UnknownHostException ex)
+ // Normalize local address to null.
+ rs1Addresses = isLocalAddress(rs1) ? null : InetAddress.getAllByName(rs1);
+ }
+ catch (UnknownHostException ex)
{
// Unknown RS: should not happen
return false;
}
- String rs2 = rs2Url.substring(0, separator2);
- InetAddress[] rs2Addresses;
+ final String rs2 = rs2Url.substring(0, separator2);
+ final InetAddress[] rs2Addresses;
try
{
- if (isLocalAddress(rs1))
- {
- // Replace localhost with the local official hostname
- rs2 = InetAddress.getLocalHost().getHostName();
- }
- rs2Addresses = InetAddress.getAllByName(rs2);
- } catch (UnknownHostException ex)
+ // Normalize local address to null.
+ rs2Addresses = isLocalAddress(rs2) ? null : InetAddress.getAllByName(rs2);
+ }
+ catch (UnknownHostException ex)
{
// Unknown RS: should not happen
return false;
}
- // Now compare addresses, if at least one match, this is the same server
- for (InetAddress inetAddress1 : rs1Addresses)
+ // Now compare addresses, if at least one match, this is the same server.
+ if (rs1Addresses == null && rs2Addresses == null)
{
- for (InetAddress inetAddress2 : rs2Addresses)
+ // Both local addresses.
+ return true;
+ }
+ else if (rs1Addresses == null || rs2Addresses == null)
+ {
+ // One local address and one non-local.
+ return false;
+ }
+ else
+ {
+ // Both non-local addresses: check for overlap.
+ for (InetAddress inetAddress1 : rs1Addresses)
{
- if (inetAddress2.equals(inetAddress1))
+ for (InetAddress inetAddress2 : rs2Addresses)
{
- return true;
+ if (inetAddress2.equals(inetAddress1))
+ {
+ return true;
+ }
}
}
}
@@ -1564,8 +1572,8 @@
keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
myState, localServerId), sameGenerationId);
}
- // Some servers in the local VM ?
- bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
+ // Some servers in the local VM or local host?
+ bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
/**
* Now apply the choice base on the weight to the best servers list
@@ -1795,26 +1803,47 @@
}
}
+
+
/**
- * Creates a new list that contains only replication servers that are in the
- * same VM as the local DS, from a passed replication server list.
- * @param bestServers The list of replication servers to filter
- * @return The sub list of replication servers being in the same VM as the
- * local DS (which may be empty)
+ * Creates a new list that contains only replication servers that are on the
+ * same host as the local DS, from a passed replication server list. This
+ * method will gives priority to any replication server which is in the same
+ * VM as this DS.
+ *
+ * @param bestServers
+ * The list of replication servers to filter
+ * @return The sub list of replication servers being on the same host as the
+ * local DS (which may be empty)
*/
- private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
- Map<Integer, ReplicationServerInfo> bestServers)
+ private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
+ Map<Integer, ReplicationServerInfo> bestServers)
{
Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
-
+ new HashMap<Integer, ReplicationServerInfo>();
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
- if (ReplicationServer.isLocalReplicationServer(
- replicationServerInfo.getServerURL()))
+ String server = replicationServerInfo.getServerURL();
+ int separator = server.lastIndexOf(':');
+ if (separator > 0)
{
- result.put(rsId, replicationServerInfo);
+ String hostname = server.substring(0, separator);
+ if (isLocalAddress(hostname))
+ {
+ int port = Integer.parseInt(server.substring(separator + 1));
+ if (isLocalReplicationServerPort(port))
+ {
+ // An RS in the same VM will always have priority.
+ result.clear();
+ result.put(rsId, replicationServerInfo);
+ break;
+ }
+ else
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
}
}
return result;
--
Gitblit v1.10.0