opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -697,7 +697,6 @@ } } // For late servers } return bestServer; } @@ -1001,7 +1000,7 @@ try { rcvWindow--; if (rcvWindow < halfRcvWindow) if ((rcvWindow < halfRcvWindow) && (session != null)) { session.publish(new WindowMessage(halfRcvWindow)); rcvWindow += halfRcvWindow; @@ -1196,9 +1195,10 @@ this.maxReceiveQueue = maxReceiveQueue; this.maxSendDelay = maxSendDelay; this.maxSendQueue = maxSendQueue; // TODO : Changing those parameters requires to either restart a new // session with the replicationServer or renegociate the parameters that // were sent in the ServerStart message // For info, a new session with the replicationServer // will be recreated in the replication domain // to take into account the new configuration. } /** opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2248,12 +2248,14 @@ disabled = true; // Stop the listener thread listenerThread.shutdown(); if (listenerThread != null) listenerThread.shutdown(); broker.stop(); // This will cut the session and wake up the listener // Wait for the listener thread to stop listenerThread.waitForShutdown(); if (listenerThread != null) listenerThread.waitForShutdown(); } /** @@ -3456,15 +3458,35 @@ // server id and base dn are readonly. // isolationPolicy can be set immediately and will apply // to the next updates. // The other parameters needs to be renegociated with the ReplicationServer. // The other parameters needs to be renegociated with the ReplicationServer // so that requires restarting the session with the ReplicationServer. replicationServers = configuration.getReplicationServer(); Boolean needToRestartSession = false; Collection<String> newReplServers = configuration.getReplicationServer(); // A new session is necessary only when information regarding // the connection is modified if ((!(replicationServers.size() == newReplServers.size() && replicationServers.containsAll(newReplServers))) || window != configuration.getWindowSize() || heartbeatInterval != configuration.getHeartbeatInterval()) needToRestartSession = true; replicationServers = newReplServers; window = configuration.getWindowSize(); heartbeatInterval = configuration.getHeartbeatInterval(); broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window, heartbeatInterval); maxSendQueue, maxSendDelay, window, heartbeatInterval); isolationpolicy = configuration.getIsolationPolicy(); // To be able to stop and restart the broker properly just // disable and enable the domain. That way a new session // with the new configuration is available. if (needToRestartSession) { this.disable(); this.enable(); } return new ConfigChangeResult(ResultCode.SUCCESS, false); } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -627,9 +627,13 @@ { // Changing those properties don't need specific code. // They will be applied for next connections. disconnectRemovedReplicationServers(configuration.getReplicationServer()); replicationServers = configuration.getReplicationServer(); if (replicationServers == null) replicationServers = new ArrayList<String>(); queueSize = configuration.getQueueSize(); long newPurgeDelay = configuration.getReplicationPurgeDelay(); if (newPurgeDelay != purgeDelay) @@ -1024,4 +1028,30 @@ } } } /** * Compute the list of replication servers that are not any * more connected to this Replication Server and stop the * corresponding handlers. * @param newReplServers the list of the new replication servers configured. */ private void disconnectRemovedReplicationServers( Collection<String> newReplServers) { Collection<String> serversToDisconnect = new ArrayList<String>(); for (String server: replicationServers) { if (!newReplServers.contains(server)) serversToDisconnect.add(server); } if (serversToDisconnect.isEmpty()) return; for (ReplicationServerDomain replicationServerDomain: baseDNs.values()) { replicationServerDomain.stopServers(serversToDisconnect); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,6 +37,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -330,6 +331,21 @@ } /** * Stop operations with a list of servers. * * @param replServers the replication servers for which * we want to stop operations */ public void stopServers(Collection<String> replServers) { for (ServerHandler handler : replicationServers.values()) { if (replServers.contains(handler.getServerAddressURL())) stopServer(handler); } } /** * Stop operations with a given server. * * @param handler the server for which we want to stop operations opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -45,9 +45,7 @@ import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.ResultCode; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -132,9 +130,10 @@ /** * Test the failover feature when one RS fails: * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology. * DS1 connected to RS1 (DS1<->RS1) * DS1 connected to one RS * Both RS are connected together (RS1<->RS2) * RS1 fails, DS1 should be connected to RS2 * The RS connected to DS1 fails, DS1 should be connected * to the other RS * * @throws Exception If a problem occured */ @@ -142,7 +141,7 @@ public void testFailOverSingle() throws Exception { String testCase = "testFailOverSingle"; int rsPort = -1; debugInfo("Starting " + testCase); initTest(); @@ -158,16 +157,29 @@ // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // Check which replication server is connected to this LDAP server rsPort = findReplServerConnected(rd1); // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); if (rsPort == rs1Port) { // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); } else { // Simulate RS2 failure rs2.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS1 ? msg = "After " + RS2_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); } // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); endTest(); } @@ -209,10 +221,10 @@ rd2 = createReplicationDomain(baseDn, DS2_ID, testCase); // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); //String msg = "Before " + RS1_ID + " failure"; //checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); //checkConnection(DS2_ID, RS1_ID, msg); // Simulate RS1 failure rs1.shutdown(); @@ -220,7 +232,7 @@ sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; String msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); @@ -395,17 +407,10 @@ SortedSet<String> replServers = new TreeSet<String>(); try { if (serverId == DS1_ID) { replServers.add("localhost:" + rs1Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs2Port); } else { fail("Unknown replication domain server id."); } // Create a domain with two replication servers replServers.add("localhost:" + rs1Port); replServers.add("localhost:" + rs2Port); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); //domainConf.setHeartbeatInterval(500); @@ -413,32 +418,6 @@ MultimasterReplication.createNewDomain(domainConf); replicationDomain.start(); // Add other server (doing that after connection insure we connect to // the right server) // WARNING: only works because for the moment, applying changes to conf // does not force reconnection in replication domain // when it is coded, the reconnect may 1 of both servers and we can not // guaranty anymore that we reach the server we want at the beginning. if (serverId == DS1_ID) { replServers.add("localhost:" + rs2Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs1Port); } else { fail("Unknown replication domain server id."); } domainConf = new DomainFakeCfg(baseDn, serverId, replServers); ConfigChangeResult chgRes = replicationDomain.applyConfigurationChange(domainConf); if ((chgRes == null) || (!chgRes.getResultCode().equals(ResultCode.SUCCESS))) { fail("Could not change replication domain config" + " (add some replication servers)."); } return replicationDomain; } catch (Exception e) @@ -475,4 +454,21 @@ super.classCleanUp(); // In case we need it extend } private int findReplServerConnected(ReplicationDomain rd) { int rsPort = -1; // First check that the Replication domain is connected if (!rd.isConnected()) return rsPort; String serverStr = rd.getReplicationServer(); int index = serverStr.lastIndexOf(':'); if ((index == -1) || (index >= serverStr.length())) fail("Enable to find port number in: " + serverStr); rsPort = (new Integer(serverStr.substring(index + 1))); return rsPort; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -39,6 +39,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -132,6 +133,7 @@ TestCaseUtils.dsconfig( "create-replication-server", "--provider-name", "Multimaster Synchronization", "--set", "replication-db-directory:" + "replicationServerTestDb", "--set", "replication-port:" + replicationServerPort, "--set", "replication-server-id:1"); @@ -187,6 +189,7 @@ backupRestore(); stopChangelog(); windowProbeTest(); replicationServerConnected(); } /** @@ -340,8 +343,8 @@ { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1), "The first message received by a new client was the wrong one." + del.getChangeNumber() + " " + firstChangeNumberServer1); "The first message received by a new client was the wrong one : " + del.getChangeNumber() + " instead of " + firstChangeNumberServer1); } } finally @@ -725,7 +728,7 @@ String baseUUID = "22222222-2222-2222-2222-222222222222"; // - Add String lentry = new String("dn: dc=example,dc=com\n" String lentry = new String("dn: o=test,dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n"); Entry entry = TestCaseUtils.entryFromLdifString(lentry); @@ -1529,4 +1532,187 @@ assertEquals(retVal, 53, "Returned error: " + eStream); } catch(Exception e) {} } /** * Replication Server configuration test of the replication Server code with 2 replication servers involved * 2 tests are done here (itest=0 or itest=1) * * Test 1 * - Create replication server 1 * - Create replication server 2 * - Connect replication server 1 to replication server 2 * - Create and connect client 1 to replication server 1 * - Create and connect client 2 to replication server 2 * - Make client1 publish changes * - Check that client 2 receives the changes published by client 1 * Then * - Change the config of replication server 1 to no more be connected * to server 2 * - Make client 1 publish a change * - Check that client 2 does not receive the change */ @Test private void replicationServerConnected() throws Exception { ReplicationBroker broker1 = null; ReplicationBroker broker2 = null; boolean emptyOldChanges = true; // - Create 2 connected replicationServer ReplicationServer[] changelogs = new ReplicationServer[2]; int[] changelogPorts = new int[2]; int[] changelogIds = new int[2]; short[] brokerIds = new short[2]; ServerSocket socket = null; // Find 2 free ports for (int i = 0; i <= 1; i++) { // find a free port socket = TestCaseUtils.bindFreePort(); changelogPorts[i] = socket.getLocalPort(); changelogIds[i] = i + 10; brokerIds[i] = (short) (100+i); socket.close(); } for (int i = 0; i <= 1; i++) { changelogs[i] = null; // create the 2 replicationServer // and connect the first one to the other one SortedSet<String> servers = new TreeSet<String>(); // Connect only replicationServer[0] to ReplicationServer[1] // and not the other way if (i==0) servers.add("localhost:" + changelogPorts[1]); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0, changelogIds[i], 0, 100, servers); changelogs[i] = new ReplicationServer(conf); } try { // Create and connect client1 to changelog1 // and client2 to changelog2 broker1 = openReplicationSession(DN.decode("dc=example,dc=com"), brokerIds[0], 100, changelogPorts[0], 1000, emptyOldChanges); broker2 = openReplicationSession(DN.decode("dc=example,dc=com"), brokerIds[1], 100, changelogPorts[0], 1000, emptyOldChanges); // - Test messages between clients by publishing now long time = TimeThread.getTime(); int ts = 1; ChangeNumber cn; String user1entryUUID = "33333333-3333-3333-3333-333333333333"; String baseUUID = "22222222-2222-2222-2222-222222222222"; // - Add String lentry = new String("dn: o=test,dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: "+ user1entryUUID +"\n"); Entry entry = TestCaseUtils.entryFromLdifString(lentry); cn = new ChangeNumber(time, ts++, brokerIds[0]); AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com", user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry .getAttributes(), new ArrayList<Attribute>()); broker1.publish(addMsg); // - Modify Attribute attr1 = new Attribute("description", "new value"); Modification mod1 = new Modification(ModificationType.REPLACE, attr1); List<Modification> mods = new ArrayList<Modification>(); mods.add(mod1); cn = new ChangeNumber(time, ts++, brokerIds[0]); ModifyMsg modMsg = new ModifyMsg(cn, DN .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid"); broker1.publish(modMsg); // - Check msg received by broker, through changeLog2 while (ts > 1) { ReplicationMessage msg2; try { msg2 = broker2.receive(); if (msg2 == null) break; broker2.updateWindowAfterReplay(); } catch (Exception e) { fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts); break; } if (msg2 instanceof AddMsg) { AddMsg addMsg2 = (AddMsg) msg2; if (addMsg2.toString().equals(addMsg.toString())) ts--; } else if (msg2 instanceof ModifyMsg) { ModifyMsg modMsg2 = (ModifyMsg) msg2; if (modMsg.equals(modMsg2)) ts--; } else { fail("ReplicationServer transmission failed: no expected message" + " class: " + msg2); break; } } // Check that everything expected has been received assertTrue(ts == 1, "Broker2 did not receive the complete set of" + " expected messages: #msg received " + ts); // Then change the config to remove replicationServer[1] from // the configuration of replicationServer[0] SortedSet<String> servers = new TreeSet<String>(); // Configure replicationServer[0] to be disconnected from ReplicationServer[1] ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0, changelogIds[0], 0, 100, servers); changelogs[0].applyConfigurationChange(conf) ; // We expect the receive to end because of a timeout : the link between RS1 & RS2 // should be distroyed by the new configuration // Send 1 update and check that RS[1] does not receive the message after the timeout try { // - Del cn = new ChangeNumber(time, ts++, brokerIds[0]); DeleteMsg delMsg = new DeleteMsg("o=test,dc=example,dc=com", cn, user1entryUUID); broker1.publish(delMsg); broker2.receive(); } catch (SocketTimeoutException soExc) { // the receive fail as expected return; } fail("Broker: receive successed when it should fail. " + "This broker was disconnected by configuration"); } finally { if (changelogs[0] != null) changelogs[0].remove(); if (changelogs[1] != null) changelogs[1].remove(); if (broker1 != null) broker1.stop(); if (broker2 != null) broker2.stop(); } } }