opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -52,7 +52,9 @@ tries to connect at startup time. </adm:synopsis> <adm:description> Addresses must be specified using the syntax: hostname:port Addresses must be specified using the syntax: "hostname:port". If IPv6 addresses are used as the hostname, they must be specified using the syntax "[IPv6Address]:port". </adm:description> <adm:default-behavior> <adm:undefined /> opends/src/admin/messages/ReplicationServerCfgDefn.properties
@@ -14,7 +14,7 @@ property.replication-port.synopsis=The port on which this Replication Server waits for connections from other Replication Servers or Directory Servers. property.replication-purge-delay.synopsis=The time (in seconds) after which the Replication Server erases all persistent information. property.replication-server.synopsis=Specifies the addresses of other Replication Servers to which this Replication Server tries to connect at startup time. property.replication-server.description=Addresses must be specified using the syntax: hostname:port property.replication-server.description=Addresses must be specified using the syntax: "hostname:port". If IPv6 addresses are used as the hostname, they must be specified using the syntax "[IPv6Address]:port". property.replication-server.syntax.string.pattern.synopsis=A host name followed by a ":" and a port number. property.replication-server-id.synopsis=Specifies a unique identifier for the Replication Server. property.replication-server-id.description=Each Replication Server must have a different server ID. opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -25,14 +25,8 @@ * Copyright 2008 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import java.io.IOException; import java.net.Socket; import java.util.SortedSet; @@ -47,7 +41,9 @@ import org.opends.server.types.CryptoManager; import org.opends.server.types.DirectoryConfig; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.util.StaticUtils.*; /** * This class represents the security configuration for replication protocol @@ -171,12 +167,10 @@ { // Create a new SSL context every time to make sure we pick up the // latest contents of the trust store. final CryptoManager cryptoManager = DirectoryConfig .getCryptoManager(); final CryptoManager cryptoManager = DirectoryConfig.getCryptoManager(); final SSLContext sslContext = cryptoManager .getSslContext(sslCertNickname); final SSLSocketFactory sslSocketFactory = sslContext .getSocketFactory(); final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); secureSocket = (SSLSocket) sslSocketFactory.createSocket( socket, socket.getInetAddress().getHostName(), @@ -203,26 +197,8 @@ { if (!hasCompleted) { try { socket.close(); } catch (final Exception ignored) { // Ignore. } if (secureSocket != null) { try { secureSocket.close(); } catch (final Exception ignored) { // Ignore. } } close(socket); close(secureSocket); } } } @@ -254,12 +230,10 @@ { // Create a new SSL context every time to make sure we pick up the // latest contents of the trust store. final CryptoManager cryptoManager = DirectoryConfig .getCryptoManager(); final CryptoManager cryptoManager = DirectoryConfig.getCryptoManager(); final SSLContext sslContext = cryptoManager .getSslContext(sslCertNickname); final SSLSocketFactory sslSocketFactory = sslContext .getSocketFactory(); final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); secureSocket = (SSLSocket) sslSocketFactory.createSocket( socket, socket.getInetAddress().getHostName(), @@ -298,26 +272,8 @@ { if (!hasCompleted) { try { socket.close(); } catch (final Exception ignored) { // Ignore. } if (secureSocket != null) { try { secureSocket.close(); } catch (final Exception ignored) { // Ignore. } } close(socket); close(secureSocket); } } } @@ -328,12 +284,10 @@ * Determine whether sessions to a given replication server should be * encrypted. * * @param serverURL * The replication server URL. * @return true if sessions to the given replication server should be * encrypted, or false if they should not be encrypted. */ public boolean isSslEncryption(final String serverURL) public boolean isSslEncryption() { // Currently use global settings from the crypto manager. return sslEncryption; opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -254,6 +254,16 @@ allInstances.add(this); } private Set<HostPort> toHostPorts(Collection<String> serverAddresses) { final Set<HostPort> results = new HashSet<HostPort>(); for (String serverAddress : serverAddresses) { results.add(HostPort.valueOf(serverAddress)); } return results; } /** * Get the list of every replication servers instantiated in the current VM. * @return The list of every replication servers instantiated in the current @@ -369,7 +379,7 @@ { while (!shutdown) { final String normalizedLocalURL = getNormalizedLocalURL(); final HostPort localAddress = HostPort.localAddress(replicationPort); for (ReplicationServerDomain domain : getReplicationServerDomains()) { /* @@ -378,22 +388,23 @@ * cannot guarantee this since the configuration may not contain this * RS. */ final Set<String> connectedRSUrls = getConnectedRSUrls(domain); for (String rsURL : replicationServerUrls) final Set<HostPort> connectedRSAddresses = getConnectedRSAddresses(domain); for (HostPort rsAddress : toHostPorts(replicationServerUrls)) { final String normalizedServerURL = normalizeServerURL(rsURL); if (connectedRSUrls.contains(normalizedServerURL)) if (connectedRSAddresses.contains(rsAddress)) { continue; // Skip: already connected. } // FIXME: this will need changing if we ever support listening on // specific addresses. if (normalizedServerURL.equals(normalizedLocalURL)) { if (rsAddress.equals(localAddress)) { continue; // Skip: avoid connecting to self. } connect(rsURL, domain.getBaseDN()); connect(rsAddress, domain.getBaseDN()); } } @@ -420,12 +431,12 @@ } } private Set<String> getConnectedRSUrls(ReplicationServerDomain domain) private Set<HostPort> getConnectedRSAddresses(ReplicationServerDomain domain) { Set<String> results = new HashSet<String>(); Set<HostPort> results = new HashSet<HostPort>(); for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values()) { results.add(normalizeServerURL(rsHandler.getServerAddressURL())); results.add(HostPort.valueOf(rsHandler.getServerAddressURL())); } return results; } @@ -433,29 +444,26 @@ /** * Establish a connection to the server with the address and port. * * @param remoteServerURL * The address and port for the server, separated by a colon. * @param remoteServerAddress * The address and port for the server * @param baseDN * The baseDN of the connection */ private void connect(String remoteServerURL, DN baseDN) private void connect(HostPort remoteServerAddress, DN baseDN) { boolean sslEncryption =replSessionSecurity.isSslEncryption(remoteServerURL); boolean sslEncryption = replSessionSecurity.isSslEncryption(); if (debugEnabled()) TRACER.debugInfo("RS " + getMonitorInstanceName() + " connects to " + remoteServerURL); + remoteServerAddress); Socket socket = new Socket(); Session session = null; try { final HostPort hp = HostPort.valueOf(remoteServerURL); final InetSocketAddress serverAddr = new InetSocketAddress( InetAddress.getByName(hp.getHost()), hp.getPort()); socket.setTcpNoDelay(true); int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); socket.connect(serverAddr, timeoutMS); socket.connect(remoteServerAddress.toInetSocketAddress(), timeoutMS); session = replSessionSecurity.createClientSession(socket, timeoutMS); ReplicationServerHandler rsHandler = new ReplicationServerHandler( @@ -881,7 +889,8 @@ // They will be applied for next connections. Some others have immediate // effect disconnectRemovedReplicationServers(configuration.getReplicationServer()); disconnectRemovedReplicationServers( toHostPorts(configuration.getReplicationServer())); replicationServerUrls = configuration.getReplicationServer(); if (replicationServerUrls == null) @@ -983,9 +992,11 @@ /** * Try and set a sensible URL for this replication server. Since we are * listening on all addresses there are a couple of potential candidates: 1) a * matching server url in the replication server's configuration, 2) hostname * local address. * listening on all addresses there are a couple of potential candidates: * <ol> * <li>a matching server URL in the replication server's configuration,</li> * <li>hostname local address.</li> * </ol> */ private void setServerURL() throws UnknownHostException { @@ -993,23 +1004,23 @@ * First try the set of configured replication servers to see if one of them * is this replication server (this should always be the case). */ for (String rsUrl : replicationServerUrls) for (HostPort rsAddress : toHostPorts(replicationServerUrls)) { /* * No need validate the string format because the admin framework has * already done it. */ final HostPort hp = HostPort.valueOf(rsUrl); if (hp.getPort() == replicationPort && hp.isLocalAddress()) if (rsAddress.getPort() == replicationPort && rsAddress.isLocalAddress()) { serverURL = rsUrl; serverURL = rsAddress.toString(); return; } } // Fall-back to the machine hostname. serverURL = InetAddress.getLocalHost().getHostName() + ":" + replicationPort; final String host = InetAddress.getLocalHost().getHostName(); // Ensure correct formatting of IPv6 addresses by using a HostPort instance. serverURL = new HostPort(host, replicationPort).toString(); } /** @@ -1330,32 +1341,21 @@ } /** * Compute the list of replication servers that are not any * more connected to this Replication Server and stop the * corresponding handlers. * @param newReplServers the list of the new replication servers configured. * Compute the list of replication servers that are not any more connected to * this Replication Server and stop the corresponding handlers. * * @param newRSAddresses * the list of addresses of the newly configured replication servers. */ private void disconnectRemovedReplicationServers( Collection<String> newReplServers) private void disconnectRemovedReplicationServers(Set<HostPort> newRSAddresses) { Collection<String> serversToDisconnect = new ArrayList<String>(); final Collection<HostPort> serversToDisconnect = new ArrayList<HostPort>(); for (String rsUrl : replicationServerUrls) for (HostPort rsAddress : toHostPorts(replicationServerUrls)) { if (!newReplServers.contains(rsUrl)) if (!newRSAddresses.contains(rsAddress)) { try { // translate the server name into IP address and keep the port number final HostPort hp = HostPort.valueOf(rsUrl); final String hostAddress = InetAddress.getByName(hp.getHost()).getHostAddress(); serversToDisconnect.add(hostAddress + ":" + hp.getPort()); } catch (IOException e) { logError(ERR_COULD_NOT_SOLVE_HOSTNAME.get(rsUrl)); } serversToDisconnect.add(rsAddress); } } @@ -1747,65 +1747,6 @@ return this.changelogDB.getDBDirectoryName(); } /** * Normalize a URL so that this host's local address is used if the provided * host name corresponds to a local interface. This method is design to work * with getNormalizedLocalURL(). */ private String normalizeServerURL(final String url) { final HostPort hp = HostPort.valueOf(url); try { InetAddress inetAddress = InetAddress.getByName(hp.getHost()); if (HostPort.isLocalAddress(inetAddress)) { inetAddress = getLocalAddress(); } return inetAddress.getHostAddress() + ":" + hp.getPort(); } catch (UnknownHostException e) { // This should not happen, but if it does then just default to the // original URL. Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hp.getHost()); logError(message); return url; } } private InetAddress getLocalAddress() { try { return InetAddress.getLocalHost(); } catch (UnknownHostException e) { try { return InetAddress.getByAddress("localhost", new byte[] { 0x7f, 0x00, 0x00, 0x01 }); } catch (UnknownHostException never) { // Illegal address length. throw new RuntimeException(never); } } } /** * Return normalized local url suitable for comparison against result returned * by normalizeServerURL(). */ private String getNormalizedLocalURL() { return getLocalAddress().getHostAddress() + ":" + replicationPort; } /** {@inheritDoc} */ @Override public String toString() opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -896,14 +896,16 @@ /** * Stop operations with a list of replication servers. * * @param replServerURLs * the replication servers URLs for which we want to stop operations * @param serversToDisconnect * the replication servers addresses for which we want to stop * operations */ public void stopReplicationServers(Collection<String> replServerURLs) public void stopReplicationServers(Collection<HostPort> serversToDisconnect) { for (ReplicationServerHandler rsHandler : connectedRSs.values()) { if (replServerURLs.contains(rsHandler.getServerAddressURL())) if (serversToDisconnect.contains( HostPort.valueOf(rsHandler.getServerAddressURL()))) { stopServer(rsHandler, false); } opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -82,8 +82,7 @@ generationId = inReplServerStartMsg.getGenerationId(); serverId = inReplServerStartMsg.getServerId(); serverURL = inReplServerStartMsg.getServerURL(); final int port = HostPort.valueOf(serverURL).getPort(); serverAddressURL = session.getRemoteAddress() + ":" + port; serverAddressURL = toServerAddressURL(serverURL); DN baseDN = DN.decode(inReplServerStartMsg.getBaseDn()); setBaseDNAndDomain(baseDN, false); setInitialServerState(inReplServerStartMsg.getServerState()); @@ -105,6 +104,13 @@ return inReplServerStartMsg.getSSLEncryption(); } private String toServerAddressURL(String serverURL) { final int port = HostPort.valueOf(serverURL).getPort(); // Ensure correct formatting of IPv6 addresses by using a HostPort instance. return new HostPort(session.getRemoteAddress(), port).toString(); } /** * Sends a start message to the remote RS. * opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -403,35 +403,37 @@ private long generationId; private byte groupId = -1; private int serverId; // Received server URL /** Received server URL. */ private String serverURL; private String baseDn = null; private int windowSize; private ServerState serverState = null; private boolean sslEncryption; private int degradedStatusThreshold = -1; // Keeps the 1 value if created with a ReplServerStartMsg /** Keeps the 1 value if created with a ReplServerStartMsg. */ private int weight = 1; // Keeps the 0 value if created with a ReplServerStartMsg /** Keeps the 0 value if created with a ReplServerStartMsg. */ private int connectedDSNumber = 0; private List<Integer> connectedDSs = null; // Is this RS locally configured ? (the RS is recognized as a usable server) /** * Is this RS locally configured? (the RS is recognized as a usable server). */ private boolean locallyConfigured = true; /** * Create a new instance of ReplicationServerInfo wrapping the passed * message. * @param msg Message to wrap. * @param server Override serverURL. * @param newServerURL Override serverURL. * @return The new instance wrapping the passed message. * @throws IllegalArgumentException If the passed message has an unexpected * type. */ public static ReplicationServerInfo newInstance( ReplicationMsg msg, String server) throws IllegalArgumentException ReplicationMsg msg, String newServerURL) throws IllegalArgumentException { ReplicationServerInfo rsInfo = newInstance(msg); rsInfo.serverURL = server; rsInfo.serverURL = newServerURL; return rsInfo; } @@ -1152,17 +1154,13 @@ try { // Open a socket connection to the next candidate. final HostPort hp = HostPort.valueOf(server); InetSocketAddress serverAddr = new InetSocketAddress( InetAddress.getByName(hp.getHost()), hp.getPort()); socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); socket.connect(serverAddr, timeoutMS); socket.connect(HostPort.valueOf(server).toInetSocketAddress(), timeoutMS); localSession = replSessionSecurity.createClientSession(socket, timeoutMS); boolean isSslEncryption = replSessionSecurity .isSslEncryption(server); boolean isSslEncryption = replSessionSecurity.isSslEncryption(); // Send our ServerStartMsg. String url = socket.getLocalAddress().getHostName() + ":" opends/src/server/org/opends/server/types/HostPort.java
@@ -102,29 +102,6 @@ private static Set<InetAddress> localAddresses = new HashSet<InetAddress>(); /** * Returns {@code true} if the provided IPv4 or IPv6 address or host name * represents the address of one of the interfaces on the current host * machine. * * @param addressString * The IPv4 or IPv6 address or host name. * @return {@code true} if the provided IPv4 or IPv6 address or host name * represents the address of one of the interfaces on the current host * machine. */ public static boolean isLocalAddress(String addressString) { try { return isLocalAddress(InetAddress.getByName(addressString)); } catch (UnknownHostException e) { return false; } } /** * Returns {@code true} if the provided {@code InetAddress} represents the * address of one of the interfaces on the current host machine. * @@ -195,6 +172,20 @@ } /** * Builds a new instance of {@link HostPort} representing the local machine * with the supplied port. * * @param port * the port to use when building the new {@link HostPort} object * @return a new {@link HostPort} instance representing the local machine with * the supplied port. */ public static HostPort localAddress(int port) { return new HostPort(LOCALHOST, port); } /** * Creates a new {@code HostPort} object with the specified port number but no * host. * @@ -380,6 +371,20 @@ } /** * Converts the current object to an equivalent {@link InetSocketAddress} * object. * * @return a {@link InetSocketAddress} equivalent of the current object. * @throws UnknownHostException * If the current host name cannot be resolved to an * {@link InetAddress} */ public InetSocketAddress toInetSocketAddress() throws UnknownHostException { return new InetSocketAddress(InetAddress.getByName(getHost()), getPort()); } /** * Returns a string representation of this {@code HostPort} object. It will be * the host element (or nothing if no host was given) followed by a colon and * the port number. opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -23,10 +23,12 @@ * * * Copyright 2007-2009 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS */ package org.opends.server.replication.server; import java.util.SortedSet; import java.util.TreeSet; import org.opends.server.admin.Configuration; import org.opends.server.admin.server.ConfigurationChangeListener; @@ -40,30 +42,30 @@ */ public class ReplServerFakeConfiguration implements ReplicationServerCfg { int port; String dirName; int purgeDelay; int serverId; int queueSize; int windowSize; private int port; private String dirName; private int purgeDelay; private int serverId; private int queueSize; private int windowSize; private SortedSet<String> servers; /* * Assured mode properties */ // Timeout (in milliseconds) when waiting for acknowledgments /** Timeout (in milliseconds) when waiting for acknowledgments. */ private long assuredTimeout = 1000; // Group id /** Group id. */ private int groupId = 1; // Threshold for status analyzers /** Threshold for status analyzers. */ private int degradedStatusThreshold = 5000; // The weight of the server /** The weight of the server. */ private int weight = 1; // The monitoring publisher period /** The monitoring publisher period. */ private long monitoringPeriod = 3000; /** @@ -86,7 +88,7 @@ } this.serverId = serverId; if (queueSize == 0) { this.queueSize = 10000; @@ -105,9 +107,9 @@ this.windowSize = windowSize; } this.servers = servers; this.servers = servers != null ? servers : new TreeSet<String>(); } /** * Constructor with group id and assured info */ @@ -241,12 +243,12 @@ { return assuredTimeout; } public int getDegradedStatusThreshold() { return degradedStatusThreshold; } public void setDegradedStatusThreshold(int degradedStatusThreshold) { this.degradedStatusThreshold = degradedStatusThreshold; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -27,15 +27,15 @@ */ package org.opends.server.replication.server; import static org.opends.server.TestCaseUtils.*; import static org.testng.Assert.*; import org.opends.server.TestCaseUtils; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.types.DN; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.testng.Assert.*; /** * Tests that we can dynamically modify the configuration of replicationServer */ @@ -49,19 +49,16 @@ @Test() public void replServerApplyChangeTest() throws Exception { ReplicationServer replicationServer = null; TestCaseUtils.startServer(); ReplicationServer replicationServer = null; try { int[] ports = TestCaseUtils.findFreePorts(2); int replicationServerPort = ports[0]; int newReplicationServerPort = ports[1]; // instantiate a Replication server using the first port number. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( replicationServerPort, null, 0, 1, 0, 0, null); ports[0], null, 0, 1, 0, 0, null); replicationServer = new ReplicationServer(conf); // Most of the configuration change are trivial to apply. @@ -71,12 +68,12 @@ // connect to this new portnumber. ReplServerFakeConfiguration newconf = new ReplServerFakeConfiguration( newReplicationServerPort, null, 0, 1, 0, 0, null); ports[1], null, 0, 1, 0, 0, null); replicationServer.applyConfigurationChange(newconf); ReplicationBroker broker = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), 1, 10, newReplicationServerPort, DN.decode(TEST_ROOT_DN_STRING), 1, 10, ports[1], 1000, false); // check that the sendWindow is not null to make sure that the @@ -85,7 +82,7 @@ } finally { replicationServer.remove(); remove(replicationServer); } } }