mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
22.56.2007 84e7776c6574bff8d8ac49d3c08a7701520de281
issue 2119 : replication servers do not connect to each other when using setup

When using the setup to configure 2 servers in multimaster mode.
The setup first creates the configuration of the first server and start it
When the second setup is run to configure the second server, the setup updates
the configuration of the replication-server on the first server, however the code for handling this dynamic configuration was not yet written and this
information is not taken into account.

The replication servers are therefore not connected and the replication server
on the new host therefore does not get the updates.

These changes implement the dynamic configuration of replication servers so that
the same scenario works fine.

The only properties that is now not dynamically configurable is the
path of the changelog database.

A new test has been added for these changes.
I've also tested manually that this does fix the replication server problem
after configuration using the setup program.
1 files added
2 files modified
177 ■■■■■ changed files
opends/src/messages/messages/replication.properties 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 82 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java 87 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -187,4 +187,10 @@
MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \
 information for attribute %s which is not defined in the schema. This \
 information will be ignored
NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \
 be closed : %s
SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \
 replication server port could not be stopped : %s
DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \
 listening on the replication port
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -80,8 +80,8 @@
  private String serverURL;
  private ServerSocket listenSocket;
  private Thread myListenThread;
  private Thread myConnectThread;
  private Thread listenThread;
  private Thread connectThread;
  /* The list of replication servers configured by the administrator */
  private Collection<String> replicationServers;
@@ -102,6 +102,7 @@
  private long trimAge; // the time (in sec) after which the  changes must
  private int replicationPort;
                        // de deleted from the persistent storage.
  private boolean stopListen = false;
  /**
   * Creates a new Replication server using the provided configuration entry.
@@ -164,7 +165,7 @@
  void runListen()
  {
    Socket newSocket = null;
    while (shutdown == false)
    while ((shutdown == false) && (stopListen  == false))
    {
      // Wait on the replicationServer port.
      // Read incoming messages and create LDAP or ReplicationServer listener
@@ -181,8 +182,11 @@
        handler.start(null, serverId, serverURL, rcvWindow, this);
      } catch (IOException e)
      {
        // ignore
        // TODO add some logging to allow problem debugging
        // The socket has probably been closed as part of the
        // shutdown or changing the port number process.
        // just log debug information and loop.
        Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
        logError(message);
      }
    }
  }
@@ -318,12 +322,12 @@
      /*
       * create working threads
       */
      myListenThread =
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener", this);
      myListenThread.start();
      myConnectThread =
      listenThread.start();
      connectThread =
        new ReplicationServerConnectThread("Replication Server Connect", this);
      myConnectThread.start();
      connectThread.start();
    } catch (DatabaseException e)
    {
@@ -374,9 +378,9 @@
    shutdown = true;
    // shutdown the connect thread
    if (myConnectThread != null)
    if (connectThread != null)
    {
      myConnectThread.interrupt();
      connectThread.interrupt();
    }
    // shutdown the listener thread
@@ -469,7 +473,61 @@
  public ConfigChangeResult applyConfigurationChange(
      ReplicationServerCfg configuration)
  {
    // TODO : implement this
    // Changing those properties don't need specific code.
    // They will be applied for next connections.
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    trimAge = configuration.getReplicationPurgeDelay();
    rcvWindow = configuration.getWindowSize();
    // changing the listen port requires to stop the listen thread
    // and restart it.
    int newPort = configuration.getReplicationPort();
    if (newPort != replicationPort)
    {
      stopListen = true;
      try
      {
        listenSocket.close();
        listenThread.join();
        stopListen = false;
        replicationPort = newPort;
        String localhostname = InetAddress.getLocalHost().getHostName();
        String localAdddress = InetAddress.getLocalHost().getHostAddress();
        serverURL = localhostname + ":" + String.valueOf(replicationPort);
        localURL = localAdddress + ":" + String.valueOf(replicationPort);
        listenSocket = new ServerSocket();
        listenSocket.setReceiveBufferSize(1000000);
        listenSocket.bind(new InetSocketAddress(replicationPort));
        listenThread =
          new ReplicationServerListenThread(
              "Replication Server Listener", this);
        listenThread.start();
      }
      catch (IOException e)
      {
        Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString());
        logError(message);
        new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
      }
      catch (InterruptedException e)
      {
        Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString());
        logError(message);
        new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
      }
    }
    if ((configuration.getReplicationDbDirectory() != null) &&
        (dbDirname != configuration.getReplicationDbDirectory()))
    {
      return new ConfigChangeResult(ResultCode.SUCCESS, true);
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
New file
@@ -0,0 +1,87 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.testng.Assert.assertTrue;
import java.net.ServerSocket;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.types.DN;
import org.testng.annotations.Test;
/**
 * Tests that we can dynamically modify the configuration of replicationServer
 */
public class ReplicationServerDynamicConfTest extends ReplicationTestCase
{
  /**
   * That that the applyConfigurationChange methos of the ReplicationServer
   * class.
   */
  @Test()
  public void replServerApplyChangeTest() throws Exception
  {
    TestCaseUtils.startServer();
    // find two free ports for the replication Server port
    ServerSocket socket1 = TestCaseUtils.bindFreePort();
    int replicationServerPort = socket1.getLocalPort();
    ServerSocket socket2 = TestCaseUtils.bindFreePort();
    int newReplicationServerPort = socket2.getLocalPort();
    socket1.close();
    socket2.close();
    // instantiate a Replication server using the first port number.
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(
          replicationServerPort, null, 0, 1, 0, 0, null);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    // Most of the configuration change are trivial to apply.
    // The interesting change is the change of the replication server port.
    // build a new ReplServerFakeConfiguration with a new server port
    // apply this new configuration and check that it is now possible to
    // connect to this new portnumber.
    ReplServerFakeConfiguration newconf =
      new ReplServerFakeConfiguration(
          newReplicationServerPort, null, 0, 1, 0, 0, null);
    replicationServer.applyConfigurationChange(newconf);
    ReplicationBroker broker = openReplicationSession(
        DN.decode("dc=example"), (short) 1, 10, newReplicationServerPort,
        1000, false);
    // check that the sendWindow is not null to make sure that the
    // broker did connect successfully.
    assertTrue(broker.getCurrentSendWindow() != 0);
  }
}