From 304b4ba339a858aa41148ac2f7b332f9eab20993 Mon Sep 17 00:00:00 2001
From: fdorson <fdorson@localhost>
Date: Thu, 03 Jul 2008 12:30:32 +0000
Subject: [PATCH] fix for issue #3317 : Removing replication links requires re-start of the server and issue #3363 : NullPointerException in ReplicationBroker.java

---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java |  104 +++++++-------
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                     |   10 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                     |   30 ++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java         |  192 +++++++++++++++++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                               |   16 ++
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java                                     |   12 +
 6 files changed, 300 insertions(+), 64 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index fc03812..31cd19f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -697,7 +697,6 @@
         }
       } // For late servers
     }
-
     return bestServer;
   }
 
@@ -1001,7 +1000,7 @@
     try
     {
       rcvWindow--;
-      if (rcvWindow < halfRcvWindow)
+      if ((rcvWindow < halfRcvWindow) && (session != null))
       {
         session.publish(new WindowMessage(halfRcvWindow));
         rcvWindow += halfRcvWindow;
@@ -1196,9 +1195,10 @@
     this.maxReceiveQueue = maxReceiveQueue;
     this.maxSendDelay = maxSendDelay;
     this.maxSendQueue = maxSendQueue;
-  // TODO : Changing those parameters requires to either restart a new
-  // session with the replicationServer or renegociate the parameters that
-  // were sent in the ServerStart message
+
+    // For info, a new session with the replicationServer
+    // will be recreated in the replication domain
+    // to take into account the new configuration.
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 5c11a47..91d3227b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2248,12 +2248,14 @@
     disabled = true;
 
     // Stop the listener thread
-    listenerThread.shutdown();
+    if (listenerThread != null)
+      listenerThread.shutdown();
 
     broker.stop(); // This will cut the session and wake up the listener
 
     // Wait for the listener thread to stop
-    listenerThread.waitForShutdown();
+    if (listenerThread != null)
+      listenerThread.waitForShutdown();
   }
 
   /**
@@ -3465,6 +3467,12 @@
                         maxSendQueue, maxSendDelay, window, heartbeatInterval);
     isolationpolicy = configuration.getIsolationPolicy();
 
+    // To be able to stop and restart the broker properly just
+    // disable and enable the domain. That way a new session
+    // with the new configuration is available.
+    this.disable();
+    this.enable();
+
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 47058ec..14de2c9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -627,9 +627,13 @@
   {
     // Changing those properties don't need specific code.
     // They will be applied for next connections.
+
+    disconnectRemovedReplicationServers(configuration.getReplicationServer());
+
     replicationServers = configuration.getReplicationServer();
     if (replicationServers == null)
       replicationServers = new ArrayList<String>();
+
     queueSize = configuration.getQueueSize();
     long newPurgeDelay = configuration.getReplicationPurgeDelay();
     if (newPurgeDelay != purgeDelay)
@@ -1024,4 +1028,30 @@
       }
     }
   }
+
+  /**
+   * Compute the list of replication servers that are not any
+   * more connected to this Replication Server and stop the
+   * corresponding handlers.
+   * @param newReplServers the list of the new replication servers configured.
+   */
+  private void disconnectRemovedReplicationServers(
+      Collection<String> newReplServers)
+  {
+    Collection<String> serversToDisconnect = new ArrayList<String>();
+
+    for (String server: replicationServers)
+    {
+      if (!newReplServers.contains(server))
+        serversToDisconnect.add(server);
+    }
+
+    if (serversToDisconnect.isEmpty())
+      return;
+
+    for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
+    {
+      replicationServerDomain.stopServers(serversToDisconnect);
+    }
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 5e7f1c7..1a0faa3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,6 +37,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -330,6 +331,21 @@
   }
 
   /**
+   * Stop operations with a list of servers.
+   *
+   * @param replServers the replication servers for which
+   * we want to stop operations
+   */
+  public void stopServers(Collection<String> replServers)
+  {
+    for (ServerHandler handler : replicationServers.values())
+    {
+      if (replServers.contains(handler.getServerAddressURL()))
+        stopServer(handler);
+    }
+  }
+
+  /**
    * Stop operations with a given server.
    *
    * @param handler the server for which we want to stop operations
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
index 994053f..471eaf4 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -45,9 +45,7 @@
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
-import org.opends.server.types.ResultCode;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -132,9 +130,10 @@
   /**
    * Test the failover feature when one RS fails:
    * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology.
-   * DS1 connected to RS1 (DS1<->RS1)
+   * DS1 connected to one RS
    * Both RS are connected together (RS1<->RS2)
-   * RS1 fails, DS1 should be connected to RS2
+   * The RS connected to DS1 fails, DS1 should be connected 
+   * to the other RS
    *
    * @throws Exception If a problem occured
    */
@@ -142,7 +141,7 @@
   public void testFailOverSingle() throws Exception
   {
     String testCase = "testFailOverSingle";
-
+    int rsPort = -1;
     debugInfo("Starting " + testCase);
 
     initTest();
@@ -158,16 +157,29 @@
 
     // DS1 connected to RS1 ?
     String msg = "Before " + RS1_ID + " failure";
-    checkConnection(DS1_ID, RS1_ID, msg);
+    // Check which replication server is connected to this LDAP server 
+    rsPort = findReplServerConnected(rd1);
 
-    // Simulate RS1 failure
-    rs1.shutdown();
-    // Let time for failover to happen
-    sleep(5000);
+    if (rsPort == rs1Port)
+    {
+      // Simulate RS1 failure
+      rs1.shutdown();
+      // Let time for failover to happen
+      sleep(5000);
+      // DS1 connected to RS2 ?
+      msg = "After " + RS1_ID + " failure";
+      checkConnection(DS1_ID, RS2_ID, msg);
+    }
+    else
+    { // Simulate RS2 failure
+      rs2.shutdown();
+      // Let time for failover to happen
+      sleep(5000);
+      // DS1 connected to RS1 ?
+      msg = "After " + RS2_ID + " failure";
+      checkConnection(DS1_ID, RS1_ID, msg);
+    }
 
-    // DS1 connected to RS2 ?
-    msg = "After " + RS1_ID + " failure";
-    checkConnection(DS1_ID, RS2_ID, msg);
 
     endTest();
   }
@@ -209,10 +221,10 @@
     rd2 = createReplicationDomain(baseDn, DS2_ID, testCase);
 
     // DS1 connected to RS1 ?
-    String msg = "Before " + RS1_ID + " failure";
-    checkConnection(DS1_ID, RS1_ID, msg);
+    //String msg = "Before " + RS1_ID + " failure";
+    //checkConnection(DS1_ID, RS1_ID, msg);
     // DS2 connected to RS2 ?
-    checkConnection(DS2_ID, RS2_ID, msg);
+    //checkConnection(DS2_ID, RS1_ID, msg);
 
     // Simulate RS1 failure
     rs1.shutdown();
@@ -220,7 +232,7 @@
     sleep(5000);
 
     // DS1 connected to RS2 ?
-    msg = "After " + RS1_ID + " failure";
+    String msg = "After " + RS1_ID + " failure";
     checkConnection(DS1_ID, RS2_ID, msg);
     // DS2 connected to RS2 ?
     checkConnection(DS2_ID, RS2_ID, msg);
@@ -395,17 +407,10 @@
     SortedSet<String> replServers = new TreeSet<String>();
     try
     {
-      if (serverId == DS1_ID)
-      {
-        replServers.add("localhost:" + rs1Port);
-      } else if (serverId == DS2_ID)
-      {
-        replServers.add("localhost:" + rs2Port);
-      } else
-      {
-        fail("Unknown replication domain server id.");
-      }
-
+      // Create a domain with two replication servers
+      replServers.add("localhost:" + rs1Port);
+      replServers.add("localhost:" + rs2Port);
+      
       DomainFakeCfg domainConf =
         new DomainFakeCfg(baseDn, serverId, replServers);
       //domainConf.setHeartbeatInterval(500);
@@ -413,32 +418,6 @@
         MultimasterReplication.createNewDomain(domainConf);
       replicationDomain.start();
 
-      // Add other server (doing that after connection insure we connect to
-      // the right server)
-      // WARNING: only works because for the moment, applying changes to conf
-      // does not force reconnection in replication domain
-      // when it is coded, the reconnect may 1 of both servers and we can not
-      // guaranty anymore that we reach the server we want at the beginning.
-      if (serverId == DS1_ID)
-      {
-        replServers.add("localhost:" + rs2Port);
-      } else if (serverId == DS2_ID)
-      {
-        replServers.add("localhost:" + rs1Port);
-      } else
-      {
-        fail("Unknown replication domain server id.");
-      }
-      domainConf = new DomainFakeCfg(baseDn, serverId, replServers);
-      ConfigChangeResult chgRes =
-        replicationDomain.applyConfigurationChange(domainConf);
-      if ((chgRes == null) ||
-        (!chgRes.getResultCode().equals(ResultCode.SUCCESS)))
-      {
-        fail("Could not change replication domain config" +
-          " (add some replication servers).");
-      }
-
       return replicationDomain;
 
     } catch (Exception e)
@@ -475,4 +454,21 @@
     super.classCleanUp();
   // In case we need it extend
   }
+  
+  private int findReplServerConnected(ReplicationDomain rd)
+  {  
+    int rsPort = -1;
+  
+    // First check that the Replication domain is connected
+    if (!rd.isConnected())
+      return rsPort;
+  
+    String serverStr = rd.getReplicationServer();
+    int index = serverStr.lastIndexOf(':');
+    if ((index == -1) || (index >= serverStr.length()))
+      fail("Enable to find port number in: " + serverStr);
+    rsPort = (new Integer(serverStr.substring(index + 1)));
+  
+      return rsPort;
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 109bb06..e80f766 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -39,6 +39,7 @@
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -132,6 +133,7 @@
     TestCaseUtils.dsconfig(
         "create-replication-server",
         "--provider-name", "Multimaster Synchronization",
+        "--set", "replication-db-directory:" + "replicationServerTestDb",
         "--set", "replication-port:" + replicationServerPort,
         "--set", "replication-server-id:1");
     
@@ -187,6 +189,7 @@
     backupRestore();
     stopChangelog();
     windowProbeTest();
+    replicationServerConnected();
   }
 
   /**
@@ -340,8 +343,8 @@
       {
         DeleteMsg del = (DeleteMsg) msg2;
         assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
-            "The first message received by a new client was the wrong one."
-            + del.getChangeNumber() + " " + firstChangeNumberServer1);
+            "The first message received by a new client was the wrong one : "
+            + del.getChangeNumber() + " instead of " + firstChangeNumberServer1);
       }
     }
     finally
@@ -725,7 +728,7 @@
         String baseUUID = "22222222-2222-2222-2222-222222222222";
 
         // - Add
-        String lentry = new String("dn: dc=example,dc=com\n"
+        String lentry = new String("dn: o=test,dc=example,dc=com\n"
             + "objectClass: top\n" + "objectClass: domain\n"
             + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
@@ -1529,4 +1532,187 @@
        assertEquals(retVal, 53, "Returned error: " + eStream);
      } catch(Exception e) {}
    }
+ 
+   /**
+    * Replication Server configuration test of the replication Server code with 2 replication servers involved
+    * 2 tests are done here (itest=0 or itest=1)
+    *
+    * Test 1
+    * - Create replication server 1
+    * - Create replication server 2 
+    * - Connect replication server 1 to replication server 2
+    * - Create and connect client 1 to replication server 1
+    * - Create and connect client 2 to replication server 2
+    * - Make client1 publish changes
+    * - Check that client 2 receives the changes published by client 1
+    * Then
+    * - Change the config of replication server 1 to no more be connected 
+    * to server 2
+    * - Make client 1 publish a change
+    * - Check that client 2 does not receive the change   
+    */
+   @Test
+   private void replicationServerConnected() throws Exception
+   {
+       ReplicationBroker broker1 = null;
+       ReplicationBroker broker2 = null;
+       boolean emptyOldChanges = true;
+
+       // - Create 2 connected replicationServer
+       ReplicationServer[] changelogs = new ReplicationServer[2];
+       int[] changelogPorts = new int[2];
+       int[] changelogIds = new int[2];
+       short[] brokerIds = new short[2];
+       ServerSocket socket = null;
+
+       // Find 2 free ports
+       for (int i = 0; i <= 1; i++)
+       {
+         // find  a free port
+         socket = TestCaseUtils.bindFreePort();
+         changelogPorts[i] = socket.getLocalPort();
+         changelogIds[i] = i + 10;
+         brokerIds[i] = (short) (100+i);
+         socket.close();
+       }
+
+       for (int i = 0; i <= 1; i++)
+       {
+         changelogs[i] = null;
+         // create the 2 replicationServer 
+         // and connect the first one to the other one
+         SortedSet<String> servers = new TreeSet<String>();
+         
+         // Connect only replicationServer[0] to ReplicationServer[1]
+         // and not the other way
+         if (i==0)
+           servers.add("localhost:" + changelogPorts[1]);
+         ReplServerFakeConfiguration conf =
+           new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
+                                          changelogIds[i], 0, 100, servers);
+         changelogs[i] = new ReplicationServer(conf);
+       }
+
+       try
+       {  
+         // Create and connect client1 to changelog1
+         // and client2 to changelog2
+         broker1 = openReplicationSession(DN.decode("dc=example,dc=com"),
+              brokerIds[0], 100, changelogPorts[0], 1000, emptyOldChanges);
+  
+         broker2 = openReplicationSession(DN.decode("dc=example,dc=com"),
+              brokerIds[1], 100, changelogPorts[0], 1000, emptyOldChanges);
+
+         // - Test messages between clients by publishing now
+         long time = TimeThread.getTime();
+         int ts = 1;
+         ChangeNumber cn;
+         String user1entryUUID = "33333333-3333-3333-3333-333333333333";
+         String baseUUID  = "22222222-2222-2222-2222-222222222222";
+
+         // - Add
+         String lentry = new String("dn: o=test,dc=example,dc=com\n"
+             + "objectClass: top\n" + "objectClass: domain\n"
+             + "entryUUID: "+ user1entryUUID +"\n");
+         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+         cn = new ChangeNumber(time, ts++, brokerIds[0]);
+         AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
+             user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
+             .getAttributes(), new ArrayList<Attribute>());
+         broker1.publish(addMsg);
+
+         // - Modify
+         Attribute attr1 = new Attribute("description", "new value");
+         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
+         List<Modification> mods = new ArrayList<Modification>();
+         mods.add(mod1);
+         cn = new ChangeNumber(time, ts++, brokerIds[0]);
+         ModifyMsg modMsg = new ModifyMsg(cn, DN
+             .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
+         broker1.publish(modMsg);
+         
+            // - Check msg received by broker, through changeLog2
+
+         while (ts > 1)
+         {
+           ReplicationMessage msg2;
+           try
+           {
+             msg2 = broker2.receive();
+             if (msg2 == null)
+               break;
+             broker2.updateWindowAfterReplay();
+           }
+           catch (Exception e)
+           {
+             fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts);
+             break;
+           }
+
+           if (msg2 instanceof AddMsg)
+           {
+             AddMsg addMsg2 = (AddMsg) msg2;
+             if (addMsg2.toString().equals(addMsg.toString()))
+               ts--;
+           }
+           else if (msg2 instanceof ModifyMsg)
+           {
+             ModifyMsg modMsg2 = (ModifyMsg) msg2;
+             if (modMsg.equals(modMsg2))
+               ts--;
+           }
+           else
+           {
+             fail("ReplicationServer transmission failed: no expected message" +
+               " class: " + msg2);
+             break;
+           }
+         }
+         // Check that everything expected has been received
+         assertTrue(ts == 1, "Broker2 did not receive the complete set of"
+             + " expected messages: #msg received " + ts);
+         
+         // Then change the config to remove replicationServer[1] from
+         // the configuration of replicationServer[0]
+           
+         SortedSet<String> servers = new TreeSet<String>();         
+         // Configure replicationServer[0] to be disconnected from ReplicationServer[1]
+         ReplServerFakeConfiguration conf =
+           new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0,
+                                          changelogIds[0], 0, 100, servers);
+         changelogs[0].applyConfigurationChange(conf) ;
+                 
+         // We expect the receive to end because of a timeout : the link between RS1 & RS2
+         // should be distroyed by the new configuration
+
+         // Send 1 update and check that RS[1] does not receive the message after the timeout
+         try
+         {   
+           // - Del
+           cn = new ChangeNumber(time, ts++, brokerIds[0]);
+           DeleteMsg delMsg = new DeleteMsg("o=test,dc=example,dc=com", cn, user1entryUUID);
+           broker1.publish(delMsg);
+           broker2.receive();
+         }
+         catch (SocketTimeoutException soExc)
+         {
+         // the receive fail as expected
+           return;
+         }
+        
+         fail("Broker: receive successed when it should fail. "
+               + "This broker was disconnected by configuration");
+       }
+       finally
+       {
+         if (changelogs[0] != null)
+           changelogs[0].remove();
+         if (changelogs[1] != null)
+           changelogs[1].remove();
+         if (broker1 != null)
+           broker1.stop();
+         if (broker2 != null)
+           broker2.stop();
+       }
+     }   
 }

--
Gitblit v1.10.0