From 698acc05bd454ec83c21ae93ecdcf168bf7ed16d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Aug 2007 13:56:14 +0000
Subject: [PATCH] issue 2119 : replication servers do not connect to each other when using setup

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                        |   82 +++++++++++++++++++++++----
 opendj-sdk/opends/src/messages/messages/replication.properties                                                                  |    8 ++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java |   87 +++++++++++++++++++++++++++++
 3 files changed, 164 insertions(+), 13 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 0b61dc0..24b6c78 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -187,4 +187,10 @@
 MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \
  information for attribute %s which is not defined in the schema. This \
  information will be ignored
-NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
\ No newline at end of file
+NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
+SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \
+ be closed : %s
+SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \
+ replication server port could not be stopped : %s
+DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \
+ listening on the replication port
\ No newline at end of file
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 3e28dc0..48b6f69 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -80,8 +80,8 @@
   private String serverURL;
 
   private ServerSocket listenSocket;
-  private Thread myListenThread;
-  private Thread myConnectThread;
+  private Thread listenThread;
+  private Thread connectThread;
 
   /* The list of replication servers configured by the administrator */
   private Collection<String> replicationServers;
@@ -102,6 +102,7 @@
   private long trimAge; // the time (in sec) after which the  changes must
   private int replicationPort;
                         // de deleted from the persistent storage.
+  private boolean stopListen = false;
 
   /**
    * Creates a new Replication server using the provided configuration entry.
@@ -164,7 +165,7 @@
   void runListen()
   {
     Socket newSocket = null;
-    while (shutdown == false)
+    while ((shutdown == false) && (stopListen  == false))
     {
       // Wait on the replicationServer port.
       // Read incoming messages and create LDAP or ReplicationServer listener
@@ -181,8 +182,11 @@
         handler.start(null, serverId, serverURL, rcvWindow, this);
       } catch (IOException e)
       {
-        // ignore
-        // TODO add some logging to allow problem debugging
+        // The socket has probably been closed as part of the
+        // shutdown or changing the port number process.
+        // just log debug information and loop.
+        Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
+        logError(message);
       }
     }
   }
@@ -318,12 +322,12 @@
       /*
        * create working threads
        */
-      myListenThread =
+      listenThread =
         new ReplicationServerListenThread("Replication Server Listener", this);
-      myListenThread.start();
-      myConnectThread =
+      listenThread.start();
+      connectThread =
         new ReplicationServerConnectThread("Replication Server Connect", this);
-      myConnectThread.start();
+      connectThread.start();
 
     } catch (DatabaseException e)
     {
@@ -374,9 +378,9 @@
     shutdown = true;
 
     // shutdown the connect thread
-    if (myConnectThread != null)
+    if (connectThread != null)
     {
-      myConnectThread.interrupt();
+      connectThread.interrupt();
     }
 
     // shutdown the listener thread
@@ -469,7 +473,61 @@
   public ConfigChangeResult applyConfigurationChange(
       ReplicationServerCfg configuration)
   {
-    // TODO : implement this
+    // Changing those properties don't need specific code.
+    // They will be applied for next connections.
+    replicationServers = configuration.getReplicationServer();
+    if (replicationServers == null)
+      replicationServers = new ArrayList<String>();
+    queueSize = configuration.getQueueSize();
+    trimAge = configuration.getReplicationPurgeDelay();
+    rcvWindow = configuration.getWindowSize();
+
+    // changing the listen port requires to stop the listen thread
+    // and restart it.
+    int newPort = configuration.getReplicationPort();
+    if (newPort != replicationPort)
+    {
+      stopListen = true;
+      try
+      {
+        listenSocket.close();
+        listenThread.join();
+        stopListen = false;
+
+        replicationPort = newPort;
+        String localhostname = InetAddress.getLocalHost().getHostName();
+        String localAdddress = InetAddress.getLocalHost().getHostAddress();
+        serverURL = localhostname + ":" + String.valueOf(replicationPort);
+        localURL = localAdddress + ":" + String.valueOf(replicationPort);
+        listenSocket = new ServerSocket();
+        listenSocket.setReceiveBufferSize(1000000);
+        listenSocket.bind(new InetSocketAddress(replicationPort));
+
+        listenThread =
+          new ReplicationServerListenThread(
+              "Replication Server Listener", this);
+        listenThread.start();
+      }
+      catch (IOException e)
+      {
+        Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString());
+        logError(message);
+        new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
+      }
+      catch (InterruptedException e)
+      {
+        Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString());
+        logError(message);
+        new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
+      }
+    }
+
+    if ((configuration.getReplicationDbDirectory() != null) &&
+        (dbDirname != configuration.getReplicationDbDirectory()))
+    {
+      return new ConfigChangeResult(ResultCode.SUCCESS, true);
+    }
+
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
new file mode 100644
index 0000000..4c919d8
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -0,0 +1,87 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.testng.Assert.assertTrue;
+
+import java.net.ServerSocket;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.types.DN;
+import org.testng.annotations.Test;
+
+/**
+ * Tests that we can dynamically modify the configuration of replicationServer 
+ */
+
+public class ReplicationServerDynamicConfTest extends ReplicationTestCase
+{
+  /**
+   * That that the applyConfigurationChange methos of the ReplicationServer
+   * class.
+   */
+  @Test()
+  public void replServerApplyChangeTest() throws Exception
+  {
+    TestCaseUtils.startServer();
+
+    // find two free ports for the replication Server port
+    ServerSocket socket1 = TestCaseUtils.bindFreePort();
+    int replicationServerPort = socket1.getLocalPort();
+    ServerSocket socket2 = TestCaseUtils.bindFreePort();
+    int newReplicationServerPort = socket2.getLocalPort();
+    socket1.close();
+    socket2.close();
+
+    // instantiate a Replication server using the first port number.
+    ReplServerFakeConfiguration conf =
+      new ReplServerFakeConfiguration(
+          replicationServerPort, null, 0, 1, 0, 0, null);
+    ReplicationServer replicationServer = new ReplicationServer(conf);
+   
+    // Most of the configuration change are trivial to apply.
+    // The interesting change is the change of the replication server port.
+    // build a new ReplServerFakeConfiguration with a new server port
+    // apply this new configuration and check that it is now possible to 
+    // connect to this new portnumber.
+    ReplServerFakeConfiguration newconf =
+      new ReplServerFakeConfiguration(
+          newReplicationServerPort, null, 0, 1, 0, 0, null);
+    
+    replicationServer.applyConfigurationChange(newconf);
+    
+    ReplicationBroker broker = openReplicationSession(
+        DN.decode("dc=example"), (short) 1, 10, newReplicationServerPort,
+        1000, false);
+    
+    // check that the sendWindow is not null to make sure that the 
+    // broker did connect successfully.
+    assertTrue(broker.getCurrentSendWindow() != 0);
+  }
+}

--
Gitblit v1.10.0