| | |
| | | */ |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT; |
| | | import static org.testng.Assert.assertTrue; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import static org.opends.server.synchronization.protocol.OperationContext.*; |
| | | |
| | | import java.net.ServerSocket; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | |
| | | /** |
| | | * Stress test from client using the ChangelogBroker API |
| | | * to the changelog server. |
| | | * This test allow to investigate the behaviour of the |
| | | * Changelog server when it needs to distribute the load of |
| | | * updates from a single LDAP server to a number of LDAP servers. |
| | | * |
| | | * This test i sconfigured by a relatively low stress |
| | | * but can be changed using TOTAL_MSG and CLIENT_THREADS consts. |
| | | */ |
| | | @Test(enabled=false, groups="slow") |
| | | public void stressFromBrokertoChangelog() throws Exception |
| | | @Test(enabled=true, groups="slow") |
| | | public void oneWriterMultipleReader() throws Exception |
| | | { |
| | | ChangelogBroker server = null; |
| | | int TOTAL_MSG = 1000; // number of messages to send during the test |
| | | int CLIENT_THREADS = 2; // number of threads that will try to read |
| | | // the messages |
| | | ChangeNumberGenerator gen = |
| | | new ChangeNumberGenerator((short)5 , (long) 0); |
| | | |
| | | BrokerReader client[] = new BrokerReader[CLIENT_THREADS]; |
| | | ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS]; |
| | | |
| | | try |
| | | { |
| | |
| | | */ |
| | | server = openChangelogSession( |
| | | DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort, |
| | | 1000, true); |
| | | 1000, 1000, 0, true); |
| | | |
| | | BrokerReader reader = new BrokerReader(server); |
| | | |
| | | /* |
| | | * Start the client threads. |
| | | */ |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | | clientBroker[i] = openChangelogSession( |
| | | DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort, |
| | | 1000, true); |
| | | client[i] = new BrokerReader(clientBroker[i]); |
| | | } |
| | | |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | | client[i].start(); |
| | | } |
| | | reader.start(); |
| | | |
| | | ChangeNumberGenerator gen = |
| | | new ChangeNumberGenerator((short)5 , (long) 0); |
| | | /* |
| | | * Simple loop creating changes and sending them |
| | | * to the changelog server. |
| | | */ |
| | | for (int i = 0; i< 100000; i++) |
| | | for (int i = 0; i< TOTAL_MSG; i++) |
| | | { |
| | | DeleteMsg msg = |
| | | new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(), |
| | | "uid"); |
| | | server.publish(msg); |
| | | } |
| | | |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | | client[i].join(); |
| | | reader.join(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if (server != null) |
| | | server.stop(); |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | | clientBroker[i].stop(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stress test from client using the ChangelogBroker API |
| | | * to the changelog server. |
| | | * |
| | | * This test allow to investigate the behaviour of the |
| | | * Changelog server when it needs to distribute the load of |
| | | * updates from multiple LDAP server to a number of LDAP servers. |
| | | * |
| | | * This test is sconfigured for a relatively low stress |
| | | * but can be changed using TOTAL_MSG and THREADS consts. |
| | | */ |
| | | @Test(enabled=false, groups="slow") |
| | | public void multipleWriterMultipleReader() throws Exception |
| | | { |
| | | ChangelogBroker server = null; |
| | | final int TOTAL_MSG = 1000; // number of messages to send during the test |
| | | final int THREADS = 2; // number of threads that will produce |
| | | // and read the messages. |
| | | |
| | | BrokerWriter producer[] = new BrokerWriter[THREADS]; |
| | | BrokerReader reader[] = new BrokerReader[THREADS]; |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Start the producer threads. |
| | | */ |
| | | for (int i =0; i< THREADS; i++) |
| | | { |
| | | short serverId = (short) (10+i); |
| | | ChangeNumberGenerator gen = |
| | | new ChangeNumberGenerator(serverId , (long) 0); |
| | | ChangelogBroker broker = |
| | | openChangelogSession( DN.decode("dc=example,dc=com"), serverId, |
| | | 100, changelogPort, 1000, 1000, 0, true); |
| | | |
| | | producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS); |
| | | reader[i] = new BrokerReader(broker); |
| | | |
| | | } |
| | | |
| | | for (int i =0; i< THREADS; i++) |
| | | { |
| | | producer[i].start(); |
| | | } |
| | | |
| | | for (int i =0; i< THREADS; i++) |
| | | { |
| | | reader[i].start(); |
| | | } |
| | | |
| | | for (int i =0; i< THREADS; i++) |
| | | { |
| | | producer[i].join(); |
| | | reader[i].join(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * After the tests stop the changelog server. |
| | | */ |
| | | @AfterClass() |
| | | public void shutdown() throws Exception |
| | | { |
| | | if (changelog != null) |
| | | changelog.shutdown(); |
| | | } |
| | | /** |
| | | * Continuously reads messages from a changelog broker until there is nothing |
| | | * left. Count the number of received messages. |
| | | */ |
| | | private class BrokerReader extends Thread |
| | | { |
| | | private ChangelogBroker broker; |
| | | |
| | | /** |
| | | * Creates a new Stress Test Reader |
| | | * @param broker |
| | | */ |
| | | public BrokerReader(ChangelogBroker broker) |
| | | { |
| | | this.broker = broker; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | // loop receiving messages until either we get a timeout |
| | | // because there is nothing left or an error condition happens. |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | SynchronizationMessage msg = broker.receive(); |
| | | if (msg == null) |
| | | break; |
| | | } |
| | | } catch (Exception e) { |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Chaining tests of the changelog code with 2 changelog servers involved |
| | | * 2 tests are done here (itest=0 or itest=1) |
| | | * |
| | | * |
| | | * Test 1 |
| | | * - Create changelog server 1 |
| | | * - Create changelog server 2 connected with changelog server 1 |
| | |
| | | * - Create and connect client 2 to changelog server 2 |
| | | * - Make client1 publish changes |
| | | * - Check that client 2 receives the changes published by client 1 |
| | | * |
| | | * |
| | | * Test 2 |
| | | * - Create changelog server 1 |
| | | * - Create and connect client1 to changelog server 1 |
| | |
| | | * - Create changelog server 2 connected with changelog server 1 |
| | | * - Create and connect client 2 to changelog server 2 |
| | | * - Check that client 2 receives the changes published by client 1 |
| | | * |
| | | * |
| | | */ |
| | | @Test(enabled=true) |
| | | public void changelogChaining() throws Exception |
| | |
| | | String changelogLdif = "dn: cn=Changelog Server\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" |
| | | + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n" |
| | | + "cn: Changelog Server\n" |
| | | + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n" |
| | | + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n" |
| | | + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n" |
| | | + "ds-cfg-window-size: 100" + "\n" |
| | |
| | | |
| | | try |
| | | { |
| | | // For itest=0, create and connect client1 to changelog1 |
| | | // For itest=0, create and connect client1 to changelog1 |
| | | // and client2 to changelog2 |
| | | // For itest=1, only create and connect client1 to changelog1 |
| | | // For itest=1, only create and connect client1 to changelog1 |
| | | // client2 will be created later |
| | | broker1 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges); |
| | | brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges); |
| | | |
| | | if (itest == 0) |
| | | { |
| | | broker2 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges); |
| | | brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges); |
| | | } |
| | | |
| | | // - Test messages between clients by publishing now |
| | |
| | | // - Delete |
| | | long time = TimeThread.getTime(); |
| | | int ts = 1; |
| | | ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | ChangeNumber cn = new ChangeNumber(time, ts++, brokerIds[0]); |
| | | |
| | | DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid"); |
| | | broker1.publish(delMsg); |
| | |
| | | + "objectClass: top\n" + "objectClass: domain\n" |
| | | + "entryUUID: 11111111-1111-1111-1111-111111111111\n"); |
| | | Entry entry = TestCaseUtils.entryFromLdifString(lentry); |
| | | cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | cn = new ChangeNumber(time, ts++, brokerIds[0]); |
| | | AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com", |
| | | user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry |
| | | .getAttributes(), new ArrayList<Attribute>()); |
| | |
| | | Modification mod1 = new Modification(ModificationType.REPLACE, attr1); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | mods.add(mod1); |
| | | cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | cn = new ChangeNumber(time, ts++, brokerIds[0]); |
| | | ModifyMsg modMsg = new ModifyMsg(cn, DN |
| | | .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid"); |
| | | broker1.publish(modMsg); |
| | | |
| | | // - ModifyDN |
| | | cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]); |
| | | cn = new ChangeNumber(time, ts++, brokerIds[0]); |
| | | ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN |
| | | .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true, |
| | | null); |
| | |
| | | String changelogLdif = "dn: cn=Changelog Server\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" |
| | | + "cn: Changelog Server\n" |
| | | + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n" |
| | | + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n" |
| | | + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n" |
| | | + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n"; |
| | | Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); |
| | | ConfigEntry changelogConfig = new ConfigEntry(tmp, null); |
| | |
| | | |
| | | // Connect broker 2 to changelog2 |
| | | broker2 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges); |
| | | brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges); |
| | | } |
| | | |
| | | // - Check msg receives by broker, through changeLog2 |
| | |
| | | } |
| | | } |
| | | // Check that everything expected has been received |
| | | assertTrue(ts == 1, "Broker2 did not receive the complete set of" |
| | | assertTrue(ts == 1, "Broker2 did not receive the complete set of" |
| | | + " expected messages: #msg received " + ts); |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * After the tests stop the changelog server. |
| | | */ |
| | | @AfterClass() |
| | | public void shutdown() throws Exception |
| | | { |
| | | if (changelog != null) |
| | | changelog.shutdown(); |
| | | } |
| | | |
| | | /** |
| | | * This class allows to creater reader thread. |
| | | * They continuously reads messages from a changelog broker until |
| | | * there is nothing left. |
| | | * They Count the number of received messages. |
| | | */ |
| | | private class BrokerReader extends Thread |
| | | { |
| | | private ChangelogBroker broker; |
| | | |
| | | /** |
| | | * Creates a new Stress Test Reader |
| | | * @param broker |
| | | */ |
| | | public BrokerReader(ChangelogBroker broker) |
| | | { |
| | | this.broker = broker; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | // loop receiving messages until either we get a timeout |
| | | // because there is nothing left or an error condition happens. |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | SynchronizationMessage msg = broker.receive(); |
| | | if (msg == null) |
| | | break; |
| | | } |
| | | } catch (Exception e) { |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This class allows to create writer thread that can |
| | | * be used as producers for the Changelog stress tests. |
| | | */ |
| | | private class BrokerWriter extends Thread |
| | | { |
| | | int count; |
| | | private ChangelogBroker broker; |
| | | ChangeNumberGenerator gen; |
| | | |
| | | public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen, |
| | | int count) |
| | | { |
| | | this.broker = broker; |
| | | this.count = count; |
| | | this.gen = gen; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | /* |
| | | * Simple loop creating changes and sending them |
| | | * to the changelog server. |
| | | */ |
| | | while (count>0) |
| | | { |
| | | count--; |
| | | |
| | | DeleteMsg msg = |
| | | new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(), |
| | | "uid"); |
| | | broker.publish(msg); |
| | | } |
| | | } |
| | | } |
| | | } |