From 698acc05bd454ec83c21ae93ecdcf168bf7ed16d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Aug 2007 13:56:14 +0000
Subject: [PATCH] issue 2119 : replication servers do not connect to each other when using setup
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 82 +++++++++++++++++++++++----
opendj-sdk/opends/src/messages/messages/replication.properties | 8 ++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java | 87 +++++++++++++++++++++++++++++
3 files changed, 164 insertions(+), 13 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 0b61dc0..24b6c78 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -187,4 +187,10 @@
MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \
information for attribute %s which is not defined in the schema. This \
information will be ignored
-NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
\ No newline at end of file
+NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
+SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \
+ be closed : %s
+SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \
+ replication server port could not be stopped : %s
+DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \
+ listening on the replication port
\ No newline at end of file
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 3e28dc0..48b6f69 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -80,8 +80,8 @@
private String serverURL;
private ServerSocket listenSocket;
- private Thread myListenThread;
- private Thread myConnectThread;
+ private Thread listenThread;
+ private Thread connectThread;
/* The list of replication servers configured by the administrator */
private Collection<String> replicationServers;
@@ -102,6 +102,7 @@
private long trimAge; // the time (in sec) after which the changes must
private int replicationPort;
// de deleted from the persistent storage.
+ private boolean stopListen = false;
/**
* Creates a new Replication server using the provided configuration entry.
@@ -164,7 +165,7 @@
void runListen()
{
Socket newSocket = null;
- while (shutdown == false)
+ while ((shutdown == false) && (stopListen == false))
{
// Wait on the replicationServer port.
// Read incoming messages and create LDAP or ReplicationServer listener
@@ -181,8 +182,11 @@
handler.start(null, serverId, serverURL, rcvWindow, this);
} catch (IOException e)
{
- // ignore
- // TODO add some logging to allow problem debugging
+ // The socket has probably been closed as part of the
+ // shutdown or changing the port number process.
+ // just log debug information and loop.
+ Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
+ logError(message);
}
}
}
@@ -318,12 +322,12 @@
/*
* create working threads
*/
- myListenThread =
+ listenThread =
new ReplicationServerListenThread("Replication Server Listener", this);
- myListenThread.start();
- myConnectThread =
+ listenThread.start();
+ connectThread =
new ReplicationServerConnectThread("Replication Server Connect", this);
- myConnectThread.start();
+ connectThread.start();
} catch (DatabaseException e)
{
@@ -374,9 +378,9 @@
shutdown = true;
// shutdown the connect thread
- if (myConnectThread != null)
+ if (connectThread != null)
{
- myConnectThread.interrupt();
+ connectThread.interrupt();
}
// shutdown the listener thread
@@ -469,7 +473,61 @@
public ConfigChangeResult applyConfigurationChange(
ReplicationServerCfg configuration)
{
- // TODO : implement this
+ // Changing those properties don't need specific code.
+ // They will be applied for next connections.
+ replicationServers = configuration.getReplicationServer();
+ if (replicationServers == null)
+ replicationServers = new ArrayList<String>();
+ queueSize = configuration.getQueueSize();
+ trimAge = configuration.getReplicationPurgeDelay();
+ rcvWindow = configuration.getWindowSize();
+
+ // changing the listen port requires to stop the listen thread
+ // and restart it.
+ int newPort = configuration.getReplicationPort();
+ if (newPort != replicationPort)
+ {
+ stopListen = true;
+ try
+ {
+ listenSocket.close();
+ listenThread.join();
+ stopListen = false;
+
+ replicationPort = newPort;
+ String localhostname = InetAddress.getLocalHost().getHostName();
+ String localAdddress = InetAddress.getLocalHost().getHostAddress();
+ serverURL = localhostname + ":" + String.valueOf(replicationPort);
+ localURL = localAdddress + ":" + String.valueOf(replicationPort);
+ listenSocket = new ServerSocket();
+ listenSocket.setReceiveBufferSize(1000000);
+ listenSocket.bind(new InetSocketAddress(replicationPort));
+
+ listenThread =
+ new ReplicationServerListenThread(
+ "Replication Server Listener", this);
+ listenThread.start();
+ }
+ catch (IOException e)
+ {
+ Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString());
+ logError(message);
+ new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
+ }
+ catch (InterruptedException e)
+ {
+ Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString());
+ logError(message);
+ new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
+ }
+ }
+
+ if ((configuration.getReplicationDbDirectory() != null) &&
+ (dbDirname != configuration.getReplicationDbDirectory()))
+ {
+ return new ConfigChangeResult(ResultCode.SUCCESS, true);
+ }
+
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
new file mode 100644
index 0000000..4c919d8
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -0,0 +1,87 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.testng.Assert.assertTrue;
+
+import java.net.ServerSocket;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.types.DN;
+import org.testng.annotations.Test;
+
+/**
+ * Tests that we can dynamically modify the configuration of replicationServer
+ */
+
+public class ReplicationServerDynamicConfTest extends ReplicationTestCase
+{
+ /**
+ * That that the applyConfigurationChange methos of the ReplicationServer
+ * class.
+ */
+ @Test()
+ public void replServerApplyChangeTest() throws Exception
+ {
+ TestCaseUtils.startServer();
+
+ // find two free ports for the replication Server port
+ ServerSocket socket1 = TestCaseUtils.bindFreePort();
+ int replicationServerPort = socket1.getLocalPort();
+ ServerSocket socket2 = TestCaseUtils.bindFreePort();
+ int newReplicationServerPort = socket2.getLocalPort();
+ socket1.close();
+ socket2.close();
+
+ // instantiate a Replication server using the first port number.
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(
+ replicationServerPort, null, 0, 1, 0, 0, null);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+
+ // Most of the configuration change are trivial to apply.
+ // The interesting change is the change of the replication server port.
+ // build a new ReplServerFakeConfiguration with a new server port
+ // apply this new configuration and check that it is now possible to
+ // connect to this new portnumber.
+ ReplServerFakeConfiguration newconf =
+ new ReplServerFakeConfiguration(
+ newReplicationServerPort, null, 0, 1, 0, 0, null);
+
+ replicationServer.applyConfigurationChange(newconf);
+
+ ReplicationBroker broker = openReplicationSession(
+ DN.decode("dc=example"), (short) 1, 10, newReplicationServerPort,
+ 1000, false);
+
+ // check that the sendWindow is not null to make sure that the
+ // broker did connect successfully.
+ assertTrue(broker.getCurrentSendWindow() != 0);
+ }
+}
--
Gitblit v1.10.0