opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -697,6 +697,7 @@ } } // For late servers } return bestServer; } @@ -1000,7 +1001,7 @@ try { rcvWindow--; if ((rcvWindow < halfRcvWindow) && (session != null)) if (rcvWindow < halfRcvWindow) { session.publish(new WindowMessage(halfRcvWindow)); rcvWindow += halfRcvWindow; @@ -1195,10 +1196,9 @@ this.maxReceiveQueue = maxReceiveQueue; this.maxSendDelay = maxSendDelay; this.maxSendQueue = maxSendQueue; // For info, a new session with the replicationServer // will be recreated in the replication domain // to take into account the new configuration. // TODO : Changing those parameters requires to either restart a new // session with the replicationServer or renegociate the parameters that // were sent in the ServerStart message } /** opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2248,14 +2248,12 @@ disabled = true; // Stop the listener thread if (listenerThread != null) listenerThread.shutdown(); listenerThread.shutdown(); broker.stop(); // This will cut the session and wake up the listener // Wait for the listener thread to stop if (listenerThread != null) listenerThread.waitForShutdown(); listenerThread.waitForShutdown(); } /** @@ -3467,12 +3465,6 @@ maxSendQueue, maxSendDelay, window, heartbeatInterval); isolationpolicy = configuration.getIsolationPolicy(); // To be able to stop and restart the broker properly just // disable and enable the domain. That way a new session // with the new configuration is available. this.disable(); this.enable(); return new ConfigChangeResult(ResultCode.SUCCESS, false); } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -627,13 +627,9 @@ { // Changing those properties don't need specific code. // They will be applied for next connections. disconnectRemovedReplicationServers(configuration.getReplicationServer()); replicationServers = configuration.getReplicationServer(); if (replicationServers == null) replicationServers = new ArrayList<String>(); queueSize = configuration.getQueueSize(); long newPurgeDelay = configuration.getReplicationPurgeDelay(); if (newPurgeDelay != purgeDelay) @@ -1028,30 +1024,4 @@ } } } /** * Compute the list of replication servers that are not any * more connected to this Replication Server and stop the * corresponding handlers. * @param newReplServers the list of the new replication servers configured. */ private void disconnectRemovedReplicationServers( Collection<String> newReplServers) { Collection<String> serversToDisconnect = new ArrayList<String>(); for (String server: replicationServers) { if (!newReplServers.contains(server)) serversToDisconnect.add(server); } if (serversToDisconnect.isEmpty()) return; for (ReplicationServerDomain replicationServerDomain: baseDNs.values()) { replicationServerDomain.stopServers(serversToDisconnect); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -331,21 +330,6 @@ } /** * Stop operations with a list of servers. * * @param replServers the replication servers for which * we want to stop operations */ public void stopServers(Collection<String> replServers) { for (ServerHandler handler : replicationServers.values()) { if (replServers.contains(handler.getServerAddressURL())) stopServer(handler); } } /** * Stop operations with a given server. * * @param handler the server for which we want to stop operations opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -45,7 +45,9 @@ import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.ResultCode; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -130,10 +132,9 @@ /** * Test the failover feature when one RS fails: * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology. * DS1 connected to one RS * DS1 connected to RS1 (DS1<->RS1) * Both RS are connected together (RS1<->RS2) * The RS connected to DS1 fails, DS1 should be connected * to the other RS * RS1 fails, DS1 should be connected to RS2 * * @throws Exception If a problem occured */ @@ -141,7 +142,7 @@ public void testFailOverSingle() throws Exception { String testCase = "testFailOverSingle"; int rsPort = -1; debugInfo("Starting " + testCase); initTest(); @@ -157,29 +158,16 @@ // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; // Check which replication server is connected to this LDAP server rsPort = findReplServerConnected(rd1); checkConnection(DS1_ID, RS1_ID, msg); if (rsPort == rs1Port) { // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); } else { // Simulate RS2 failure rs2.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS1 ? msg = "After " + RS2_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); } // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); endTest(); } @@ -221,10 +209,10 @@ rd2 = createReplicationDomain(baseDn, DS2_ID, testCase); // DS1 connected to RS1 ? //String msg = "Before " + RS1_ID + " failure"; //checkConnection(DS1_ID, RS1_ID, msg); String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS2 ? //checkConnection(DS2_ID, RS1_ID, msg); checkConnection(DS2_ID, RS2_ID, msg); // Simulate RS1 failure rs1.shutdown(); @@ -232,7 +220,7 @@ sleep(5000); // DS1 connected to RS2 ? String msg = "After " + RS1_ID + " failure"; msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); @@ -407,10 +395,17 @@ SortedSet<String> replServers = new TreeSet<String>(); try { // Create a domain with two replication servers replServers.add("localhost:" + rs1Port); replServers.add("localhost:" + rs2Port); if (serverId == DS1_ID) { replServers.add("localhost:" + rs1Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs2Port); } else { fail("Unknown replication domain server id."); } DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); //domainConf.setHeartbeatInterval(500); @@ -418,6 +413,32 @@ MultimasterReplication.createNewDomain(domainConf); replicationDomain.start(); // Add other server (doing that after connection insure we connect to // the right server) // WARNING: only works because for the moment, applying changes to conf // does not force reconnection in replication domain // when it is coded, the reconnect may 1 of both servers and we can not // guaranty anymore that we reach the server we want at the beginning. if (serverId == DS1_ID) { replServers.add("localhost:" + rs2Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs1Port); } else { fail("Unknown replication domain server id."); } domainConf = new DomainFakeCfg(baseDn, serverId, replServers); ConfigChangeResult chgRes = replicationDomain.applyConfigurationChange(domainConf); if ((chgRes == null) || (!chgRes.getResultCode().equals(ResultCode.SUCCESS))) { fail("Could not change replication domain config" + " (add some replication servers)."); } return replicationDomain; } catch (Exception e) @@ -454,21 +475,4 @@ super.classCleanUp(); // In case we need it extend } private int findReplServerConnected(ReplicationDomain rd) { int rsPort = -1; // First check that the Replication domain is connected if (!rd.isConnected()) return rsPort; String serverStr = rd.getReplicationServer(); int index = serverStr.lastIndexOf(':'); if ((index == -1) || (index >= serverStr.length())) fail("Enable to find port number in: " + serverStr); rsPort = (new Integer(serverStr.substring(index + 1))); return rsPort; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -39,7 +39,6 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -133,7 +132,6 @@ TestCaseUtils.dsconfig( "create-replication-server", "--provider-name", "Multimaster Synchronization", "--set", "replication-db-directory:" + "replicationServerTestDb", "--set", "replication-port:" + replicationServerPort, "--set", "replication-server-id:1"); @@ -189,7 +187,6 @@ backupRestore(); stopChangelog(); windowProbeTest(); replicationServerConnected(); } /** @@ -343,8 +340,8 @@ { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1), "The first message received by a new client was the wrong one : " + del.getChangeNumber() + " instead of " + firstChangeNumberServer1); "The first message received by a new client was the wrong one." + del.getChangeNumber() + " " + firstChangeNumberServer1); } } finally @@ -728,7 +725,7 @@ String baseUUID = "22222222-2222-2222-2222-222222222222"; // - Add String lentry = new String("dn: o=test,dc=example,dc=com\n" String lentry = new String("dn: dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n"); Entry entry = TestCaseUtils.entryFromLdifString(lentry); @@ -1532,187 +1529,4 @@ assertEquals(retVal, 53, "Returned error: " + eStream); } catch(Exception e) {} } /** * Replication Server configuration test of the replication Server code with 2 replication servers involved * 2 tests are done here (itest=0 or itest=1) * * Test 1 * - Create replication server 1 * - Create replication server 2 * - Connect replication server 1 to replication server 2 * - Create and connect client 1 to replication server 1 * - Create and connect client 2 to replication server 2 * - Make client1 publish changes * - Check that client 2 receives the changes published by client 1 * Then * - Change the config of replication server 1 to no more be connected * to server 2 * - Make client 1 publish a change * - Check that client 2 does not receive the change */ @Test private void replicationServerConnected() throws Exception { ReplicationBroker broker1 = null; ReplicationBroker broker2 = null; boolean emptyOldChanges = true; // - Create 2 connected replicationServer ReplicationServer[] changelogs = new ReplicationServer[2]; int[] changelogPorts = new int[2]; int[] changelogIds = new int[2]; short[] brokerIds = new short[2]; ServerSocket socket = null; // Find 2 free ports for (int i = 0; i <= 1; i++) { // find a free port socket = TestCaseUtils.bindFreePort(); changelogPorts[i] = socket.getLocalPort(); changelogIds[i] = i + 10; brokerIds[i] = (short) (100+i); socket.close(); } for (int i = 0; i <= 1; i++) { changelogs[i] = null; // create the 2 replicationServer // and connect the first one to the other one SortedSet<String> servers = new TreeSet<String>(); // Connect only replicationServer[0] to ReplicationServer[1] // and not the other way if (i==0) servers.add("localhost:" + changelogPorts[1]); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0, changelogIds[i], 0, 100, servers); changelogs[i] = new ReplicationServer(conf); } try { // Create and connect client1 to changelog1 // and client2 to changelog2 broker1 = openReplicationSession(DN.decode("dc=example,dc=com"), brokerIds[0], 100, changelogPorts[0], 1000, emptyOldChanges); broker2 = openReplicationSession(DN.decode("dc=example,dc=com"), brokerIds[1], 100, changelogPorts[0], 1000, emptyOldChanges); // - Test messages between clients by publishing now long time = TimeThread.getTime(); int ts = 1; ChangeNumber cn; String user1entryUUID = "33333333-3333-3333-3333-333333333333"; String baseUUID = "22222222-2222-2222-2222-222222222222"; // - Add String lentry = new String("dn: o=test,dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: "+ user1entryUUID +"\n"); Entry entry = TestCaseUtils.entryFromLdifString(lentry); cn = new ChangeNumber(time, ts++, brokerIds[0]); AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com", user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry .getAttributes(), new ArrayList<Attribute>()); broker1.publish(addMsg); // - Modify Attribute attr1 = new Attribute("description", "new value"); Modification mod1 = new Modification(ModificationType.REPLACE, attr1); List<Modification> mods = new ArrayList<Modification>(); mods.add(mod1); cn = new ChangeNumber(time, ts++, brokerIds[0]); ModifyMsg modMsg = new ModifyMsg(cn, DN .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid"); broker1.publish(modMsg); // - Check msg received by broker, through changeLog2 while (ts > 1) { ReplicationMessage msg2; try { msg2 = broker2.receive(); if (msg2 == null) break; broker2.updateWindowAfterReplay(); } catch (Exception e) { fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts); break; } if (msg2 instanceof AddMsg) { AddMsg addMsg2 = (AddMsg) msg2; if (addMsg2.toString().equals(addMsg.toString())) ts--; } else if (msg2 instanceof ModifyMsg) { ModifyMsg modMsg2 = (ModifyMsg) msg2; if (modMsg.equals(modMsg2)) ts--; } else { fail("ReplicationServer transmission failed: no expected message" + " class: " + msg2); break; } } // Check that everything expected has been received assertTrue(ts == 1, "Broker2 did not receive the complete set of" + " expected messages: #msg received " + ts); // Then change the config to remove replicationServer[1] from // the configuration of replicationServer[0] SortedSet<String> servers = new TreeSet<String>(); // Configure replicationServer[0] to be disconnected from ReplicationServer[1] ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0, changelogIds[0], 0, 100, servers); changelogs[0].applyConfigurationChange(conf) ; // We expect the receive to end because of a timeout : the link between RS1 & RS2 // should be distroyed by the new configuration // Send 1 update and check that RS[1] does not receive the message after the timeout try { // - Del cn = new ChangeNumber(time, ts++, brokerIds[0]); DeleteMsg delMsg = new DeleteMsg("o=test,dc=example,dc=com", cn, user1entryUUID); broker1.publish(delMsg); broker2.receive(); } catch (SocketTimeoutException soExc) { // the receive fail as expected return; } fail("Broker: receive successed when it should fail. " + "This broker was disconnected by configuration"); } finally { if (changelogs[0] != null) changelogs[0].remove(); if (changelogs[1] != null) changelogs[1].remove(); if (broker1 != null) broker1.stop(); if (broker2 != null) broker2.stop(); } } }