From 304b4ba339a858aa41148ac2f7b332f9eab20993 Mon Sep 17 00:00:00 2001
From: fdorson <fdorson@localhost>
Date: Thu, 03 Jul 2008 12:30:32 +0000
Subject: [PATCH] fix for issue #3317 : Removing replication links requires re-start of the server and issue #3363 : NullPointerException in ReplicationBroker.java
---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java | 104 +++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 30 ++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 192 +++++++++++++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 16 ++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 12 +
6 files changed, 300 insertions(+), 64 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index fc03812..31cd19f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -697,7 +697,6 @@
}
} // For late servers
}
-
return bestServer;
}
@@ -1001,7 +1000,7 @@
try
{
rcvWindow--;
- if (rcvWindow < halfRcvWindow)
+ if ((rcvWindow < halfRcvWindow) && (session != null))
{
session.publish(new WindowMessage(halfRcvWindow));
rcvWindow += halfRcvWindow;
@@ -1196,9 +1195,10 @@
this.maxReceiveQueue = maxReceiveQueue;
this.maxSendDelay = maxSendDelay;
this.maxSendQueue = maxSendQueue;
- // TODO : Changing those parameters requires to either restart a new
- // session with the replicationServer or renegociate the parameters that
- // were sent in the ServerStart message
+
+ // For info, a new session with the replicationServer
+ // will be recreated in the replication domain
+ // to take into account the new configuration.
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 5c11a47..91d3227b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2248,12 +2248,14 @@
disabled = true;
// Stop the listener thread
- listenerThread.shutdown();
+ if (listenerThread != null)
+ listenerThread.shutdown();
broker.stop(); // This will cut the session and wake up the listener
// Wait for the listener thread to stop
- listenerThread.waitForShutdown();
+ if (listenerThread != null)
+ listenerThread.waitForShutdown();
}
/**
@@ -3465,6 +3467,12 @@
maxSendQueue, maxSendDelay, window, heartbeatInterval);
isolationpolicy = configuration.getIsolationPolicy();
+ // To be able to stop and restart the broker properly just
+ // disable and enable the domain. That way a new session
+ // with the new configuration is available.
+ this.disable();
+ this.enable();
+
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 47058ec..14de2c9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -627,9 +627,13 @@
{
// Changing those properties don't need specific code.
// They will be applied for next connections.
+
+ disconnectRemovedReplicationServers(configuration.getReplicationServer());
+
replicationServers = configuration.getReplicationServer();
if (replicationServers == null)
replicationServers = new ArrayList<String>();
+
queueSize = configuration.getQueueSize();
long newPurgeDelay = configuration.getReplicationPurgeDelay();
if (newPurgeDelay != purgeDelay)
@@ -1024,4 +1028,30 @@
}
}
}
+
+ /**
+ * Compute the list of replication servers that are not any
+ * more connected to this Replication Server and stop the
+ * corresponding handlers.
+ * @param newReplServers the list of the new replication servers configured.
+ */
+ private void disconnectRemovedReplicationServers(
+ Collection<String> newReplServers)
+ {
+ Collection<String> serversToDisconnect = new ArrayList<String>();
+
+ for (String server: replicationServers)
+ {
+ if (!newReplServers.contains(server))
+ serversToDisconnect.add(server);
+ }
+
+ if (serversToDisconnect.isEmpty())
+ return;
+
+ for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
+ {
+ replicationServerDomain.stopServers(serversToDisconnect);
+ }
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 5e7f1c7..1a0faa3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -330,6 +331,21 @@
}
/**
+ * Stop operations with a list of servers.
+ *
+ * @param replServers the replication servers for which
+ * we want to stop operations
+ */
+ public void stopServers(Collection<String> replServers)
+ {
+ for (ServerHandler handler : replicationServers.values())
+ {
+ if (replServers.contains(handler.getServerAddressURL()))
+ stopServer(handler);
+ }
+ }
+
+ /**
* Stop operations with a given server.
*
* @param handler the server for which we want to stop operations
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
index 994053f..471eaf4 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -45,9 +45,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
-import org.opends.server.types.ResultCode;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -132,9 +130,10 @@
/**
* Test the failover feature when one RS fails:
* 1 DS (DS1) and 2 RS (RS1 and RS2) in topology.
- * DS1 connected to RS1 (DS1<->RS1)
+ * DS1 connected to one RS
* Both RS are connected together (RS1<->RS2)
- * RS1 fails, DS1 should be connected to RS2
+ * The RS connected to DS1 fails, DS1 should be connected
+ * to the other RS
*
* @throws Exception If a problem occured
*/
@@ -142,7 +141,7 @@
public void testFailOverSingle() throws Exception
{
String testCase = "testFailOverSingle";
-
+ int rsPort = -1;
debugInfo("Starting " + testCase);
initTest();
@@ -158,16 +157,29 @@
// DS1 connected to RS1 ?
String msg = "Before " + RS1_ID + " failure";
- checkConnection(DS1_ID, RS1_ID, msg);
+ // Check which replication server is connected to this LDAP server
+ rsPort = findReplServerConnected(rd1);
- // Simulate RS1 failure
- rs1.shutdown();
- // Let time for failover to happen
- sleep(5000);
+ if (rsPort == rs1Port)
+ {
+ // Simulate RS1 failure
+ rs1.shutdown();
+ // Let time for failover to happen
+ sleep(5000);
+ // DS1 connected to RS2 ?
+ msg = "After " + RS1_ID + " failure";
+ checkConnection(DS1_ID, RS2_ID, msg);
+ }
+ else
+ { // Simulate RS2 failure
+ rs2.shutdown();
+ // Let time for failover to happen
+ sleep(5000);
+ // DS1 connected to RS1 ?
+ msg = "After " + RS2_ID + " failure";
+ checkConnection(DS1_ID, RS1_ID, msg);
+ }
- // DS1 connected to RS2 ?
- msg = "After " + RS1_ID + " failure";
- checkConnection(DS1_ID, RS2_ID, msg);
endTest();
}
@@ -209,10 +221,10 @@
rd2 = createReplicationDomain(baseDn, DS2_ID, testCase);
// DS1 connected to RS1 ?
- String msg = "Before " + RS1_ID + " failure";
- checkConnection(DS1_ID, RS1_ID, msg);
+ //String msg = "Before " + RS1_ID + " failure";
+ //checkConnection(DS1_ID, RS1_ID, msg);
// DS2 connected to RS2 ?
- checkConnection(DS2_ID, RS2_ID, msg);
+ //checkConnection(DS2_ID, RS1_ID, msg);
// Simulate RS1 failure
rs1.shutdown();
@@ -220,7 +232,7 @@
sleep(5000);
// DS1 connected to RS2 ?
- msg = "After " + RS1_ID + " failure";
+ String msg = "After " + RS1_ID + " failure";
checkConnection(DS1_ID, RS2_ID, msg);
// DS2 connected to RS2 ?
checkConnection(DS2_ID, RS2_ID, msg);
@@ -395,17 +407,10 @@
SortedSet<String> replServers = new TreeSet<String>();
try
{
- if (serverId == DS1_ID)
- {
- replServers.add("localhost:" + rs1Port);
- } else if (serverId == DS2_ID)
- {
- replServers.add("localhost:" + rs2Port);
- } else
- {
- fail("Unknown replication domain server id.");
- }
-
+ // Create a domain with two replication servers
+ replServers.add("localhost:" + rs1Port);
+ replServers.add("localhost:" + rs2Port);
+
DomainFakeCfg domainConf =
new DomainFakeCfg(baseDn, serverId, replServers);
//domainConf.setHeartbeatInterval(500);
@@ -413,32 +418,6 @@
MultimasterReplication.createNewDomain(domainConf);
replicationDomain.start();
- // Add other server (doing that after connection insure we connect to
- // the right server)
- // WARNING: only works because for the moment, applying changes to conf
- // does not force reconnection in replication domain
- // when it is coded, the reconnect may 1 of both servers and we can not
- // guaranty anymore that we reach the server we want at the beginning.
- if (serverId == DS1_ID)
- {
- replServers.add("localhost:" + rs2Port);
- } else if (serverId == DS2_ID)
- {
- replServers.add("localhost:" + rs1Port);
- } else
- {
- fail("Unknown replication domain server id.");
- }
- domainConf = new DomainFakeCfg(baseDn, serverId, replServers);
- ConfigChangeResult chgRes =
- replicationDomain.applyConfigurationChange(domainConf);
- if ((chgRes == null) ||
- (!chgRes.getResultCode().equals(ResultCode.SUCCESS)))
- {
- fail("Could not change replication domain config" +
- " (add some replication servers).");
- }
-
return replicationDomain;
} catch (Exception e)
@@ -475,4 +454,21 @@
super.classCleanUp();
// In case we need it extend
}
+
+ private int findReplServerConnected(ReplicationDomain rd)
+ {
+ int rsPort = -1;
+
+ // First check that the Replication domain is connected
+ if (!rd.isConnected())
+ return rsPort;
+
+ String serverStr = rd.getReplicationServer();
+ int index = serverStr.lastIndexOf(':');
+ if ((index == -1) || (index >= serverStr.length()))
+ fail("Enable to find port number in: " + serverStr);
+ rsPort = (new Integer(serverStr.substring(index + 1)));
+
+ return rsPort;
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 109bb06..e80f766 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -39,6 +39,7 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -132,6 +133,7 @@
TestCaseUtils.dsconfig(
"create-replication-server",
"--provider-name", "Multimaster Synchronization",
+ "--set", "replication-db-directory:" + "replicationServerTestDb",
"--set", "replication-port:" + replicationServerPort,
"--set", "replication-server-id:1");
@@ -187,6 +189,7 @@
backupRestore();
stopChangelog();
windowProbeTest();
+ replicationServerConnected();
}
/**
@@ -340,8 +343,8 @@
{
DeleteMsg del = (DeleteMsg) msg2;
assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
- "The first message received by a new client was the wrong one."
- + del.getChangeNumber() + " " + firstChangeNumberServer1);
+ "The first message received by a new client was the wrong one : "
+ + del.getChangeNumber() + " instead of " + firstChangeNumberServer1);
}
}
finally
@@ -725,7 +728,7 @@
String baseUUID = "22222222-2222-2222-2222-222222222222";
// - Add
- String lentry = new String("dn: dc=example,dc=com\n"
+ String lentry = new String("dn: o=test,dc=example,dc=com\n"
+ "objectClass: top\n" + "objectClass: domain\n"
+ "entryUUID: 11111111-1111-1111-1111-111111111111\n");
Entry entry = TestCaseUtils.entryFromLdifString(lentry);
@@ -1529,4 +1532,187 @@
assertEquals(retVal, 53, "Returned error: " + eStream);
} catch(Exception e) {}
}
+
+ /**
+ * Replication Server configuration test of the replication Server code with 2 replication servers involved
+ * 2 tests are done here (itest=0 or itest=1)
+ *
+ * Test 1
+ * - Create replication server 1
+ * - Create replication server 2
+ * - Connect replication server 1 to replication server 2
+ * - Create and connect client 1 to replication server 1
+ * - Create and connect client 2 to replication server 2
+ * - Make client1 publish changes
+ * - Check that client 2 receives the changes published by client 1
+ * Then
+ * - Change the config of replication server 1 to no more be connected
+ * to server 2
+ * - Make client 1 publish a change
+ * - Check that client 2 does not receive the change
+ */
+ @Test
+ private void replicationServerConnected() throws Exception
+ {
+ ReplicationBroker broker1 = null;
+ ReplicationBroker broker2 = null;
+ boolean emptyOldChanges = true;
+
+ // - Create 2 connected replicationServer
+ ReplicationServer[] changelogs = new ReplicationServer[2];
+ int[] changelogPorts = new int[2];
+ int[] changelogIds = new int[2];
+ short[] brokerIds = new short[2];
+ ServerSocket socket = null;
+
+ // Find 2 free ports
+ for (int i = 0; i <= 1; i++)
+ {
+ // find a free port
+ socket = TestCaseUtils.bindFreePort();
+ changelogPorts[i] = socket.getLocalPort();
+ changelogIds[i] = i + 10;
+ brokerIds[i] = (short) (100+i);
+ socket.close();
+ }
+
+ for (int i = 0; i <= 1; i++)
+ {
+ changelogs[i] = null;
+ // create the 2 replicationServer
+ // and connect the first one to the other one
+ SortedSet<String> servers = new TreeSet<String>();
+
+ // Connect only replicationServer[0] to ReplicationServer[1]
+ // and not the other way
+ if (i==0)
+ servers.add("localhost:" + changelogPorts[1]);
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
+ changelogIds[i], 0, 100, servers);
+ changelogs[i] = new ReplicationServer(conf);
+ }
+
+ try
+ {
+ // Create and connect client1 to changelog1
+ // and client2 to changelog2
+ broker1 = openReplicationSession(DN.decode("dc=example,dc=com"),
+ brokerIds[0], 100, changelogPorts[0], 1000, emptyOldChanges);
+
+ broker2 = openReplicationSession(DN.decode("dc=example,dc=com"),
+ brokerIds[1], 100, changelogPorts[0], 1000, emptyOldChanges);
+
+ // - Test messages between clients by publishing now
+ long time = TimeThread.getTime();
+ int ts = 1;
+ ChangeNumber cn;
+ String user1entryUUID = "33333333-3333-3333-3333-333333333333";
+ String baseUUID = "22222222-2222-2222-2222-222222222222";
+
+ // - Add
+ String lentry = new String("dn: o=test,dc=example,dc=com\n"
+ + "objectClass: top\n" + "objectClass: domain\n"
+ + "entryUUID: "+ user1entryUUID +"\n");
+ Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+ cn = new ChangeNumber(time, ts++, brokerIds[0]);
+ AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
+ user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
+ .getAttributes(), new ArrayList<Attribute>());
+ broker1.publish(addMsg);
+
+ // - Modify
+ Attribute attr1 = new Attribute("description", "new value");
+ Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(mod1);
+ cn = new ChangeNumber(time, ts++, brokerIds[0]);
+ ModifyMsg modMsg = new ModifyMsg(cn, DN
+ .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
+ broker1.publish(modMsg);
+
+ // - Check msg received by broker, through changeLog2
+
+ while (ts > 1)
+ {
+ ReplicationMessage msg2;
+ try
+ {
+ msg2 = broker2.receive();
+ if (msg2 == null)
+ break;
+ broker2.updateWindowAfterReplay();
+ }
+ catch (Exception e)
+ {
+ fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts);
+ break;
+ }
+
+ if (msg2 instanceof AddMsg)
+ {
+ AddMsg addMsg2 = (AddMsg) msg2;
+ if (addMsg2.toString().equals(addMsg.toString()))
+ ts--;
+ }
+ else if (msg2 instanceof ModifyMsg)
+ {
+ ModifyMsg modMsg2 = (ModifyMsg) msg2;
+ if (modMsg.equals(modMsg2))
+ ts--;
+ }
+ else
+ {
+ fail("ReplicationServer transmission failed: no expected message" +
+ " class: " + msg2);
+ break;
+ }
+ }
+ // Check that everything expected has been received
+ assertTrue(ts == 1, "Broker2 did not receive the complete set of"
+ + " expected messages: #msg received " + ts);
+
+ // Then change the config to remove replicationServer[1] from
+ // the configuration of replicationServer[0]
+
+ SortedSet<String> servers = new TreeSet<String>();
+ // Configure replicationServer[0] to be disconnected from ReplicationServer[1]
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0,
+ changelogIds[0], 0, 100, servers);
+ changelogs[0].applyConfigurationChange(conf) ;
+
+ // We expect the receive to end because of a timeout : the link between RS1 & RS2
+ // should be distroyed by the new configuration
+
+ // Send 1 update and check that RS[1] does not receive the message after the timeout
+ try
+ {
+ // - Del
+ cn = new ChangeNumber(time, ts++, brokerIds[0]);
+ DeleteMsg delMsg = new DeleteMsg("o=test,dc=example,dc=com", cn, user1entryUUID);
+ broker1.publish(delMsg);
+ broker2.receive();
+ }
+ catch (SocketTimeoutException soExc)
+ {
+ // the receive fail as expected
+ return;
+ }
+
+ fail("Broker: receive successed when it should fail. "
+ + "This broker was disconnected by configuration");
+ }
+ finally
+ {
+ if (changelogs[0] != null)
+ changelogs[0].remove();
+ if (changelogs[1] != null)
+ changelogs[1].remove();
+ if (broker1 != null)
+ broker1.stop();
+ if (broker2 != null)
+ broker2.stop();
+ }
+ }
}
--
Gitblit v1.10.0