| | |
| | | */ |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT; |
| | | import static org.testng.Assert.assertTrue; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import java.net.ServerSocket; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyDNOperation; |
| | | import org.opends.server.synchronization.SynchronizationTestCase; |
| | | import org.opends.server.synchronization.common.ChangeNumber; |
| | | import org.opends.server.synchronization.common.ChangeNumberGenerator; |
| | | import org.opends.server.synchronization.common.ServerState; |
| | | import org.opends.server.synchronization.plugin.ChangelogBroker; |
| | | import org.opends.server.synchronization.protocol.AddMsg; |
| | | import org.opends.server.synchronization.protocol.DeleteMsg; |
| | | import org.opends.server.synchronization.protocol.ModifyDNMsg; |
| | | import org.opends.server.synchronization.protocol.ModifyDnContext; |
| | | import org.opends.server.synchronization.protocol.ModifyMsg; |
| | | import org.opends.server.synchronization.protocol.SynchronizationMessage; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.RDN; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Tests for the changelog service code. |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Chaining tests of the changelog code with 2 changelog servers involved |
| | | * 2 tests are done here (itest=0 or itest=1) |
| | | * |
| | | * Test 1 |
| | | * - Create changelog server 1 |
| | | * - Create changelog server 2 connected with changelog server 1 |
| | | * - Create and connect client 1 to changelog server 1 |
| | | * - Create and connect client 2 to changelog server 2 |
| | | * - Make client1 publish changes |
| | | * - Check that client 2 receives the changes published by client 1 |
| | | * |
| | | * Test 2 |
| | | * - Create changelog server 1 |
| | | * - Create and connect client1 to changelog server 1 |
| | | * - Make client1 publish changes |
| | | * - Create changelog server 2 connected with changelog server 1 |
| | | * - Create and connect client 2 to changelog server 2 |
| | | * - Check that client 2 receives the changes published by client 1 |
| | | * |
| | | */ |
| | | @Test(enabled=true) |
| | | public void changelogChaining() throws Exception |
| | | { |
| | | for (int itest = 0; itest <2; itest++) |
| | | { |
| | | ChangelogBroker broker2 = null; |
| | | boolean emptyOldChanges = true; |
| | | |
| | | // - Create 2 connected changelog servers |
| | | Changelog[] changelogs = new Changelog[2]; |
| | | int[] changelogPorts = new int[2]; |
| | | int[] changelogIds = new int[2]; |
| | | short[] brokerIds = new short[2]; |
| | | ServerSocket socket = null; |
| | | |
| | | // Find 2 free ports |
| | | for (int i = 0; i <= 1; i++) |
| | | { |
| | | // find a free port |
| | | socket = TestCaseUtils.bindFreePort(); |
| | | changelogPorts[i] = socket.getLocalPort(); |
| | | changelogIds[i] = i + 10; |
| | | brokerIds[i] = (short) (100+i); |
| | | if ((itest==0) || (i ==0)) |
| | | socket.close(); |
| | | } |
| | | |
| | | for (int i = 0; i <= ((itest == 0) ? 1 : 0); i++) |
| | | { |
| | | changelogs[i] = null; |
| | | |
| | | // for itest=0, create the 2 connected changelog servers |
| | | // for itest=1, create the 1rst changelog server, the second |
| | | // one will be created later |
| | | |
| | | String changelogLdif = "dn: cn=Changelog Server\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" |
| | | + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n" |
| | | + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n" |
| | | + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n" |
| | | + "ds-cfg-window-size: 100" + "\n" |
| | | + "ds-cfg-changelog-db-dirname: changelogDb"+i; |
| | | Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); |
| | | ConfigEntry changelogConfig = new ConfigEntry(tmp, null); |
| | | changelogs[i] = new Changelog(changelogConfig); |
| | | } |
| | | |
| | | ChangelogBroker broker1 = null; |
| | | |
| | | try |
| | | { |
| | | // For itest=0, create and connect client1 to changelog1 |
| | | // and client2 to changelog2 |
| | | // For itest=1, only create and connect client1 to changelog1 |
| | | // client2 will be created later |
| | | broker1 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges); |
| | | |
| | | if (itest == 0) |
| | | { |
| | | broker2 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges); |
| | | } |
| | | |
| | | // - Test messages between clients by publishing now |
| | | |
| | | // - Delete |
| | | long time = TimeThread.getTime(); |
| | | int ts = 1; |
| | | ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | |
| | | DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid"); |
| | | broker1.publish(delMsg); |
| | | |
| | | String user1entryUUID = "33333333-3333-3333-3333-333333333333"; |
| | | String baseUUID = "22222222-2222-2222-2222-222222222222"; |
| | | |
| | | // - Add |
| | | String lentry = new String("dn: dc=example,dc=com\n" |
| | | + "objectClass: top\n" + "objectClass: domain\n" |
| | | + "entryUUID: 11111111-1111-1111-1111-111111111111\n"); |
| | | Entry entry = TestCaseUtils.entryFromLdifString(lentry); |
| | | cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com", |
| | | user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry |
| | | .getAttributes(), new ArrayList<Attribute>()); |
| | | broker1.publish(addMsg); |
| | | |
| | | // - Modify |
| | | Attribute attr1 = new Attribute("description", "new value"); |
| | | Modification mod1 = new Modification(ModificationType.REPLACE, attr1); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | mods.add(mod1); |
| | | cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | ModifyMsg modMsg = new ModifyMsg(cn, DN |
| | | .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid"); |
| | | broker1.publish(modMsg); |
| | | |
| | | // - ModifyDN |
| | | cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN |
| | | .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true, |
| | | null); |
| | | op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn, "uniqueid", |
| | | "newparentId")); |
| | | ModifyDNMsg modDNMsg = new ModifyDNMsg(op); |
| | | broker1.publish(modDNMsg); |
| | | |
| | | if (itest > 0) |
| | | { |
| | | socket.close(); |
| | | String changelogLdif = "dn: cn=Changelog Server\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" |
| | | + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n" |
| | | + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n" |
| | | + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n"; |
| | | Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); |
| | | ConfigEntry changelogConfig = new ConfigEntry(tmp, null); |
| | | changelogs[1] = new Changelog(changelogConfig); |
| | | |
| | | // Connect broker 2 to changelog2 |
| | | broker2 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges); |
| | | } |
| | | |
| | | // - Check msg receives by broker, through changeLog2 |
| | | while (ts > 1) |
| | | { |
| | | SynchronizationMessage msg2; |
| | | try |
| | | { |
| | | msg2 = broker2.receive(); |
| | | if (msg2 == null) |
| | | break; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest); |
| | | break; |
| | | } |
| | | |
| | | if (msg2 instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg delMsg2 = (DeleteMsg) msg2; |
| | | if (delMsg2.toString().equals(delMsg.toString())) |
| | | ts--; |
| | | } |
| | | else if (msg2 instanceof AddMsg) |
| | | { |
| | | AddMsg addMsg2 = (AddMsg) msg2; |
| | | if (addMsg2.toString().equals(addMsg.toString())) |
| | | ts--; |
| | | } |
| | | else if (msg2 instanceof ModifyMsg) |
| | | { |
| | | ModifyMsg modMsg2 = (ModifyMsg) msg2; |
| | | if (modMsg.equals(modMsg2)) |
| | | ts--; |
| | | } |
| | | else if (msg2 instanceof ModifyDNMsg) |
| | | { |
| | | ModifyDNMsg modDNMsg2 = (ModifyDNMsg) msg2; |
| | | if (modDNMsg.equals(modDNMsg2)) |
| | | ts--; |
| | | } |
| | | else |
| | | { |
| | | fail("Changelog transmission failed: no expected message class."); |
| | | break; |
| | | } |
| | | } |
| | | // Check that everything expected has been received |
| | | assertTrue(ts == 1, "Broker2 did not receive the complete set of" |
| | | + " expected messages: #msg received " + ts); |
| | | } |
| | | finally |
| | | { |
| | | if (changelogs[0] != null) |
| | | changelogs[0].shutdown(); |
| | | if (changelogs[1] != null) |
| | | changelogs[1].shutdown(); |
| | | if (broker1 != null) |
| | | broker1.stop(); |
| | | if (broker2 != null) |
| | | broker2.stop(); |
| | | } |
| | | } |
| | | } |
| | | } |