/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2016 ForgeRock AS. */ package org.opends.server.replication.server; import static org.forgerock.opendj.ldap.ModificationType.*; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.replication.protocol.OperationContext.*; import static org.opends.server.util.CollectionUtils.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import org.assertj.core.api.Assertions; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; import org.forgerock.opendj.ldap.DN; import org.forgerock.opendj.ldap.RDN; import org.opends.server.TestCaseUtils; import org.opends.server.api.SynchronizationProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyDNOperationBasis; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.plugin.DummyReplicationDomain; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.ReplicationServerListener; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyDnContext; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ReplServerStartDSMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ServerStartMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.StartSessionMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.WindowMsg; import org.opends.server.replication.protocol.WindowProbeMsg; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.types.Attributes; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.HostPort; import org.opends.server.types.Modification; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** Tests for the replicationServer code. */ @SuppressWarnings("javadoc") public class ReplicationServerTest extends ReplicationTestCase { /** The tracer object for the debug logger. */ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private DN TEST_ROOT_DN; private DN EXAMPLE_DN; /** The replicationServer that will be used in this test. */ private ReplicationServer replicationServer; /** The port of the replicationServer. */ private int replicationServerPort; private CSN firstCSNServer1; private CSN secondCSNServer1; private CSN firstCSNServer2; private CSN secondCSNServer2; private CSN unknownCSNServer1; /** Set up the environment for performing the tests in this Class. */ @BeforeClass @Override public void setUp() throws Exception { super.setUp(); TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING); EXAMPLE_DN = DN.valueOf("ou=example," + TEST_ROOT_DN_STRING); // This test suite depends on having the schema available. configure(); } /** Start the server and configure a replicationServer. */ private void configure() throws Exception { replicationServerPort = TestCaseUtils.findFreePort(); TestCaseUtils.dsconfig( "create-replication-server", "--provider-name", "Multimaster Synchronization", "--set", "replication-db-directory:" + "replicationServerTestConfigureDb", "--set", "replication-port:" + replicationServerPort, "--set", "replication-server-id:71"); for (SynchronizationProvider> provider : DirectoryServer .getSynchronizationProviders()) { if (provider instanceof MultimasterReplication) { MultimasterReplication mmp = (MultimasterReplication) provider; ReplicationServerListener list = mmp.getReplicationServerListener(); if (list != null) { replicationServer = list.getReplicationServer(); if (replicationServer != null) { break; } } } } assertNotNull(replicationServer); } private void debugInfo(String s) { //logger.error(LocalizableMessage.raw("** TEST ** " + s)); if (logger.isTraceEnabled()) { logger.trace("** TEST ** " + s); } } /** * The tests in this class only works in a specific order. * This method is used to make sure that this order is always respected. * (Using testng dependency does not work) */ @Test(enabled = true) public void replicationServerTest() throws Exception { ReplicationBroker[] brokers1And2 = null; try { brokers1And2 = changelogBasic(); newClientLateServer1(); newClient(); newClientWithFirstChanges(); newClientWithChangefromServer1(); newClientWithChangefromServer2(); newClientWithUnknownChanges(); } finally { stop(brokers1And2); } stopChangelog(); } /** * This test allows to check the behavior of the Replication Server * when the DS disconnect and reconnect again. * In order to stress the protocol in such case, connection and * disconnection is done inside an infinite loop and therefore this * test is disabled and should only be enabled in workspaces but never * committed in the repository. */ @Test(enabled=false) public void replicationServerTestLoop() throws Exception { ReplicationBroker[] brokers1And2 = null; try { brokers1And2 = changelogBasic(); while (true) { newClient(); } } finally { stop(brokers1And2); } } /** Create two brokers: open for each a sender session and a receiver session to the replicationServer. */ private ReplicationBroker[] createReplicationBrokers1And2() throws Exception { return new ReplicationBroker[] { openReplicationSession(TEST_ROOT_DN, 1, 100, replicationServerPort, 1000), openReplicationSession(TEST_ROOT_DN, 2, 100, replicationServerPort, 1000) }; } @Override protected ReplicationBroker openReplicationSession(final DN baseDN, int serverId, int windowSize, int port, int timeout) throws Exception { ReplicationBroker broker = super.openReplicationSession(baseDN, serverId, windowSize, port, timeout); assertTrue(broker.isConnected()); return broker; } /** * Basic test of the replicationServer code : Connect 2 clients to the * replicationServer and exchange messages between the clients. Note : Other * tests in this file depends on this test and may need to change if this test * is modified. * * @return the two brokers created to simulate 2 DS sending and receiving * messages. They are returned to allow other tests to close the * brokers themselves. */ private ReplicationBroker[] changelogBasic() throws Exception { clearChangelogDB(replicationServer); debugInfo("Starting changelogBasic"); ReplicationBroker[] brokers1And2 = createReplicationBrokers1And2(); ReplicationBroker server1 = brokers1And2[0]; ReplicationBroker server2 = brokers1And2[1]; /* * Create CSNs for the messages sent from server 1 with current time * sequence 1 and with current time + 2 sequence 2 */ long time = TimeThread.getTime(); firstCSNServer1 = new CSN(time, 1, 1); secondCSNServer1 = new CSN(time + 2, 2, 1); /* * Create CSNs for the messages sent from server 2 with current time * sequence 1 and with current time + 3 sequence 2 */ firstCSNServer2 = new CSN(time + 1, 1, 2); secondCSNServer2 = new CSN(time + 3, 2, 2); /* * Create a CSN between firstCSNServer1 and secondCSNServer1 that will not * be used to create a change sent to the replicationServer but that will * be used in the Server State when opening a connection to the * ReplicationServer to make sure that the ReplicationServer is able to * accept such clients. */ unknownCSNServer1 = new CSN(time + 1, 1, 1); sendAndReceiveDeleteMsg(server1, server2, EXAMPLE_DN, firstCSNServer1, "uid"); sendAndReceiveDeleteMsg(server1, server2, TEST_ROOT_DN, secondCSNServer1, "uid"); // Send and receive a Delete Msg from server 2 to server 1 sendAndReceiveDeleteMsg(server2, server1, EXAMPLE_DN, firstCSNServer2, "other-uid"); sendAndReceiveDeleteMsg(server2, server1, TEST_ROOT_DN, secondCSNServer2, "uid"); debugInfo("Ending changelogBasic"); return new ReplicationBroker[] { server1, server2 }; } private void sendAndReceiveDeleteMsg(ReplicationBroker sender, ReplicationBroker receiver, DN dn, CSN csn, String entryUUID) throws Exception { DeleteMsg sentMsg = new DeleteMsg(dn, csn, entryUUID); sender.publish(sentMsg); ReplicationMsg receivedMsg = receiver.receive(); receiver.updateWindowAfterReplay(); assertDeleteMsgBodyEquals(sentMsg, receivedMsg); } private void assertDeleteMsgBodyEquals(DeleteMsg sentMsg, ReplicationMsg receivedMsg) { Assertions.assertThat(receivedMsg).isInstanceOf(DeleteMsg.class); assertEquals(receivedMsg.toString(), sentMsg.toString(), "ReplicationServer basic : incorrect message body received. CSN is same as \"" + getCSNFieldName(((DeleteMsg) receivedMsg).getCSN()) + "\" field."); } private String getCSNFieldName(CSN csn) { if (csn == null) { return ""; } else if (csn.equals(firstCSNServer1)) { return "firstCSNServer1"; } else if (csn.equals(secondCSNServer1)) { return "secondCSNServer1"; } else if (csn.equals(firstCSNServer2)) { return "firstCSNServer2"; } else if (csn.equals(secondCSNServer2)) { return "secondCSNServer2"; } else if (csn.equals(unknownCSNServer1)) { return "unknownCSNServer1"; } return null; } private ServerState newServerState(CSN... csns) { ServerState state = new ServerState(); for (CSN csn : csns) { state.update(csn); } return state; } /** Test that a new client see the change that was sent in the previous test. */ private void newClient() throws Exception { debugInfo("Starting newClient"); ReplicationBroker broker = null; try { broker = openReplicationSession(TEST_ROOT_DN, 3, 100, replicationServerPort, 1000); ReplicationMsg receivedMsg = broker.receive(); broker.updateWindowAfterReplay(); assertDeleteMsgCSNEquals(receivedMsg, firstCSNServer1, "first"); debugInfo("Ending newClient"); } finally { stop(broker); } } /** Test that a client that has already seen some changes now receive the correct next change. */ private void newClientWithChanges(ServerState state, CSN nextCSN) throws Exception { ReplicationBroker broker = null; // Connect to the replicationServer using the state created above. try { final long generationId = getGenerationId(TEST_ROOT_DN); broker = new ReplicationBroker(new DummyReplicationDomain(generationId), state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort), getReplSessionSecurity()); connect(broker, 5000); ReplicationMsg receivedMsg = broker.receive(); broker.updateWindowAfterReplay(); assertDeleteMsgCSNEquals(receivedMsg, nextCSN, "second"); } finally { stop(broker); } } /** Asserts that the CSN for the passed in message matches the supplied CSN. */ private void assertDeleteMsgCSNEquals(ReplicationMsg msg, CSN nextCSN, String msgNumber) { Assertions.assertThat(msg).isInstanceOf(DeleteMsg.class); DeleteMsg del = (DeleteMsg) msg; assertEquals(del.getCSN(), nextCSN, "The " + msgNumber + " message received by a new client was the wrong one."); } /** Test that a client that has already seen the first change now see the second change. */ private void newClientWithFirstChanges() throws Exception { debugInfo("Starting newClientWithFirstChanges"); /* * Create a ServerState updated with the first changes from both servers * done in test changelogBasic. */ ServerState state = newServerState(firstCSNServer1, firstCSNServer2); newClientWithChanges(state, secondCSNServer1); debugInfo("Ending newClientWithFirstChanges"); } /** Test with a client that has already seen a Change that the ReplicationServer has not seen. */ private void newClientWithUnknownChanges() throws Exception { debugInfo("Starting newClientWithUnknownChanges"); ServerState state = newServerState(unknownCSNServer1, secondCSNServer2); newClientWithChanges(state, secondCSNServer1); debugInfo("Ending newClientWithUnknownChanges"); } /** * Test that a client that has already seen the first change from server 1 * now see the first change from server 2. */ private void newClientWithChangefromServer1() throws Exception { debugInfo("Starting newClientWithChangefromServer1"); ServerState state = newServerState(firstCSNServer1); newClientWithChanges(state, firstCSNServer2); debugInfo("Ending newClientWithChangefromServer1"); } /** * Test that a client that has already seen the first change from server 2 * now see the first change from server 1. */ private void newClientWithChangefromServer2() throws Exception { debugInfo("Starting newClientWithChangefromServer2"); ServerState state = newServerState(firstCSNServer2); newClientWithChanges(state, firstCSNServer1); debugInfo("Ending newClientWithChangefromServer2"); } /** Test that a client that has not seen the second change from server 1 now receive it. */ private void newClientLateServer1() throws Exception { debugInfo("Starting newClientLateServer1"); ServerState state = newServerState(secondCSNServer2, firstCSNServer1); newClientWithChanges(state, secondCSNServer1); debugInfo("Ending newClientLateServer1"); } /** * Test that newClient() and newClientWithFirstChange() still works * after stopping and restarting the replicationServer. */ private void stopChangelog() throws Exception { ReplicationBroker[] brokers1And2 = null; try { debugInfo("Starting stopChangelog"); shutdown(); configure(); brokers1And2 = createReplicationBrokers1And2(); newClient(); newClientWithFirstChanges(); newClientWithChangefromServer1(); newClientWithChangefromServer2(); debugInfo("Ending stopChangelog"); } finally { stop(brokers1And2); } } /** * Stress test from client using the ReplicationBroker API * to the replicationServer. * This test allow to investigate the behaviour of the * ReplicationServer when it needs to distribute the load of * updates from a single LDAP server to a number of LDAP servers. * * This test is configured by a relatively low stress * but can be changed using TOTAL_MSG and CLIENT_THREADS consts. */ @Test(enabled = true) public void oneWriterMultipleReader() throws Exception { debugInfo("Starting oneWriterMultipleReader"); clearChangelogDB(replicationServer); TestCaseUtils.initializeTestBackend(true); ReplicationBroker server = null; BrokerReader reader = null; int TOTAL_MSG = 1000; // number of messages to send during the test int CLIENT_THREADS = 2; // number of threads that will try to read // the messages CSNGenerator gen = new CSNGenerator(5 , 0); BrokerReader client[] = new BrokerReader[CLIENT_THREADS]; ReplicationBroker clientBroker[] = new ReplicationBroker[CLIENT_THREADS]; try { /* Open a sender session */ server = openReplicationSession(TEST_ROOT_DN, 5, 100, replicationServerPort, 100000); reader = new BrokerReader(server, TOTAL_MSG); /* Start the client threads. */ for (int i =0; i< CLIENT_THREADS; i++) { clientBroker[i] = openReplicationSession(TEST_ROOT_DN, 100+i, 100, replicationServerPort, 1000); client[i] = new BrokerReader(clientBroker[i], TOTAL_MSG); } for (BrokerReader c : client) { c.start(); } reader.start(); /* Simple loop creating changes and sending them to the replicationServer. */ for (int i = 0; i< TOTAL_MSG; i++) { server.publish(new DeleteMsg(EXAMPLE_DN, gen.newCSN(), "uid")); } debugInfo("Ending oneWriterMultipleReader"); } finally { if (reader != null) { reader.join(10000); } stop(server); join(client); stop(clientBroker); if (reader != null) { assertNull(reader.errDetails, reader.exc + " " + reader.errDetails); } } } /** * Stress test from client using the ReplicationBroker API * to the replicationServer. * * This test allow to investigate the behavior of the * ReplicationServer when it needs to distribute the load of * updates from multiple LDAP server to a number of LDAP servers. * * This test is configured for a relatively low stress * but can be changed using TOTAL_MSG and THREADS consts. */ @Test(enabled = true, groups = "opendj-256") public void multipleWriterMultipleReader() throws Exception { debugInfo("Starting multipleWriterMultipleReader"); final int TOTAL_MSG = 1000; // number of messages to send during the test final int THREADS = 2; // number of threads that will produce // and read the messages. BrokerWriter producer[] = new BrokerWriter[THREADS]; BrokerReader reader[] = new BrokerReader[THREADS]; ReplicationBroker broker[] = new ReplicationBroker[THREADS]; clearChangelogDB(replicationServer); TestCaseUtils.initializeTestBackend(true); try { /* Start the producer threads. */ for (int i = 0; i< THREADS; i++) { int serverId = 10 + i; CSNGenerator gen = new CSNGenerator(serverId , 0); broker[i] = openReplicationSession(TEST_ROOT_DN, serverId, 100, replicationServerPort, 3000); producer[i] = new BrokerWriter(broker[i], gen, TOTAL_MSG/THREADS); reader[i] = new BrokerReader(broker[i], (TOTAL_MSG/THREADS)*(THREADS-1)); } for (BrokerWriter p : producer) { p.start(); } for (BrokerReader r : reader) { r.start(); } debugInfo("multipleWriterMultipleReader produces and readers started"); } finally { debugInfo("multipleWriterMultipleReader wait producers end"); join(producer); debugInfo("multipleWriterMultipleReader producers ended, now wait readers end"); join(reader); debugInfo("multipleWriterMultipleReader reader's ended, now stop brokers"); stop(broker); debugInfo("multipleWriterMultipleReader brokers stopped"); for (BrokerReader r : reader) { if (r != null) { assertNull(r.errDetails, r.exc + " " + r.errDetails); } } } debugInfo("Ending multipleWriterMultipleReader"); } private void join(Thread[] threads) throws InterruptedException { for (Thread t : threads) { if (t != null) { t.join(10000); // kill the thread in case it is not yet stopped. t.interrupt(); } } } /** *