| opends/src/messages/messages/replication.properties | ●●●●● patch | view | raw | blame | history | |
| opends/src/server/org/opends/server/replication/server/ReplicationServer.java | ●●●●● patch | view | raw | blame | history | |
| opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java | ●●●●● patch | view | raw | blame | history |
opends/src/messages/messages/replication.properties
@@ -187,4 +187,10 @@ MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \ information for attribute %s which is not defined in the schema. This \ information will be ignored NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \ be closed : %s SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \ replication server port could not be stopped : %s DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \ listening on the replication port opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -80,8 +80,8 @@ private String serverURL; private ServerSocket listenSocket; private Thread myListenThread; private Thread myConnectThread; private Thread listenThread; private Thread connectThread; /* The list of replication servers configured by the administrator */ private Collection<String> replicationServers; @@ -102,6 +102,7 @@ private long trimAge; // the time (in sec) after which the changes must private int replicationPort; // de deleted from the persistent storage. private boolean stopListen = false; /** * Creates a new Replication server using the provided configuration entry. @@ -164,7 +165,7 @@ void runListen() { Socket newSocket = null; while (shutdown == false) while ((shutdown == false) && (stopListen == false)) { // Wait on the replicationServer port. // Read incoming messages and create LDAP or ReplicationServer listener @@ -181,8 +182,11 @@ handler.start(null, serverId, serverURL, rcvWindow, this); } catch (IOException e) { // ignore // TODO add some logging to allow problem debugging // The socket has probably been closed as part of the // shutdown or changing the port number process. // just log debug information and loop. Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get(); logError(message); } } } @@ -318,12 +322,12 @@ /* * create working threads */ myListenThread = listenThread = new ReplicationServerListenThread("Replication Server Listener", this); myListenThread.start(); myConnectThread = listenThread.start(); connectThread = new ReplicationServerConnectThread("Replication Server Connect", this); myConnectThread.start(); connectThread.start(); } catch (DatabaseException e) { @@ -374,9 +378,9 @@ shutdown = true; // shutdown the connect thread if (myConnectThread != null) if (connectThread != null) { myConnectThread.interrupt(); connectThread.interrupt(); } // shutdown the listener thread @@ -469,7 +473,61 @@ public ConfigChangeResult applyConfigurationChange( ReplicationServerCfg configuration) { // TODO : implement this // Changing those properties don't need specific code. // They will be applied for next connections. replicationServers = configuration.getReplicationServer(); if (replicationServers == null) replicationServers = new ArrayList<String>(); queueSize = configuration.getQueueSize(); trimAge = configuration.getReplicationPurgeDelay(); rcvWindow = configuration.getWindowSize(); // changing the listen port requires to stop the listen thread // and restart it. int newPort = configuration.getReplicationPort(); if (newPort != replicationPort) { stopListen = true; try { listenSocket.close(); listenThread.join(); stopListen = false; replicationPort = newPort; String localhostname = InetAddress.getLocalHost().getHostName(); String localAdddress = InetAddress.getLocalHost().getHostAddress(); serverURL = localhostname + ":" + String.valueOf(replicationPort); localURL = localAdddress + ":" + String.valueOf(replicationPort); listenSocket = new ServerSocket(); listenSocket.setReceiveBufferSize(1000000); listenSocket.bind(new InetSocketAddress(replicationPort)); listenThread = new ReplicationServerListenThread( "Replication Server Listener", this); listenThread.start(); } catch (IOException e) { Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString()); logError(message); new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false); } catch (InterruptedException e) { Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString()); logError(message); new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false); } } if ((configuration.getReplicationDbDirectory() != null) && (dbDirname != configuration.getReplicationDbDirectory())) { return new ConfigChangeResult(ResultCode.SUCCESS, true); } return new ConfigChangeResult(ResultCode.SUCCESS, false); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
New file @@ -0,0 +1,87 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.testng.Assert.assertTrue; import java.net.ServerSocket; import org.opends.server.TestCaseUtils; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.types.DN; import org.testng.annotations.Test; /** * Tests that we can dynamically modify the configuration of replicationServer */ public class ReplicationServerDynamicConfTest extends ReplicationTestCase { /** * That that the applyConfigurationChange methos of the ReplicationServer * class. */ @Test() public void replServerApplyChangeTest() throws Exception { TestCaseUtils.startServer(); // find two free ports for the replication Server port ServerSocket socket1 = TestCaseUtils.bindFreePort(); int replicationServerPort = socket1.getLocalPort(); ServerSocket socket2 = TestCaseUtils.bindFreePort(); int newReplicationServerPort = socket2.getLocalPort(); socket1.close(); socket2.close(); // instantiate a Replication server using the first port number. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( replicationServerPort, null, 0, 1, 0, 0, null); ReplicationServer replicationServer = new ReplicationServer(conf); // Most of the configuration change are trivial to apply. // The interesting change is the change of the replication server port. // build a new ReplServerFakeConfiguration with a new server port // apply this new configuration and check that it is now possible to // connect to this new portnumber. ReplServerFakeConfiguration newconf = new ReplServerFakeConfiguration( newReplicationServerPort, null, 0, 1, 0, 0, null); replicationServer.applyConfigurationChange(newconf); ReplicationBroker broker = openReplicationSession( DN.decode("dc=example"), (short) 1, 10, newReplicationServerPort, 1000, false); // check that the sendWindow is not null to make sure that the // broker did connect successfully. assertTrue(broker.getCurrentSendWindow() != 0); } }