/*
|
* The contents of this file are subject to the terms of the Common Development and
|
* Distribution License (the License). You may not use this file except in compliance with the
|
* License.
|
*
|
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
|
* specific language governing permission and limitations under the License.
|
*
|
* When distributing Covered Software, include this CDDL Header Notice in each file and include
|
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
|
* Header, with the fields enclosed by brackets [] replaced by your own identifying
|
* information: "Portions Copyright [year] [name of copyright owner]".
|
*
|
* Copyright 2006-2009 Sun Microsystems, Inc.
|
* Portions Copyright 2011-2016 ForgeRock AS.
|
*/
|
package org.opends.server.replication.server;
|
|
import java.net.InetSocketAddress;
|
import java.net.Socket;
|
import java.net.SocketTimeoutException;
|
import java.util.ArrayList;
|
import java.util.Arrays;
|
import java.util.List;
|
import java.util.SortedSet;
|
import java.util.TreeSet;
|
|
import org.assertj.core.api.Assertions;
|
import org.forgerock.i18n.slf4j.LocalizedLogger;
|
import org.forgerock.opendj.config.server.ConfigException;
|
import org.forgerock.opendj.ldap.DN;
|
import org.forgerock.opendj.ldap.RDN;
|
import org.opends.server.TestCaseUtils;
|
import org.opends.server.api.SynchronizationProvider;
|
import org.opends.server.core.DirectoryServer;
|
import org.opends.server.core.ModifyDNOperationBasis;
|
import org.opends.server.replication.ReplicationTestCase;
|
import org.opends.server.replication.common.AssuredMode;
|
import org.opends.server.replication.common.CSN;
|
import org.opends.server.replication.common.CSNGenerator;
|
import org.opends.server.replication.common.ServerState;
|
import org.opends.server.replication.common.ServerStatus;
|
import org.opends.server.replication.plugin.DummyReplicationDomain;
|
import org.opends.server.replication.plugin.MultimasterReplication;
|
import org.opends.server.replication.plugin.ReplicationServerListener;
|
import org.opends.server.replication.protocol.AddMsg;
|
import org.opends.server.replication.protocol.DeleteMsg;
|
import org.opends.server.replication.protocol.ModifyDNMsg;
|
import org.opends.server.replication.protocol.ModifyDnContext;
|
import org.opends.server.replication.protocol.ModifyMsg;
|
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
|
import org.opends.server.replication.protocol.ReplSessionSecurity;
|
import org.opends.server.replication.protocol.ReplicationMsg;
|
import org.opends.server.replication.protocol.ServerStartMsg;
|
import org.opends.server.replication.protocol.Session;
|
import org.opends.server.replication.protocol.StartSessionMsg;
|
import org.opends.server.replication.protocol.TopologyMsg;
|
import org.opends.server.replication.protocol.UpdateMsg;
|
import org.opends.server.replication.protocol.WindowMsg;
|
import org.opends.server.replication.protocol.WindowProbeMsg;
|
import org.opends.server.replication.service.ReplicationBroker;
|
import org.opends.server.types.Attributes;
|
import org.opends.server.types.DirectoryConfig;
|
import org.opends.server.types.DirectoryException;
|
import org.opends.server.types.Entry;
|
import org.opends.server.types.HostPort;
|
import org.opends.server.types.Modification;
|
import org.opends.server.util.TimeThread;
|
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
|
import org.testng.annotations.AfterClass;
|
import org.testng.annotations.BeforeClass;
|
import org.testng.annotations.Test;
|
|
import static org.forgerock.opendj.ldap.ModificationType.*;
|
import static org.opends.server.TestCaseUtils.*;
|
import static org.opends.server.replication.protocol.OperationContext.*;
|
import static org.opends.server.util.CollectionUtils.*;
|
import static org.opends.server.util.StaticUtils.*;
|
import static org.testng.Assert.*;
|
|
/** Tests for the replicationServer code. */
|
@SuppressWarnings("javadoc")
|
public class ReplicationServerTest extends ReplicationTestCase
|
{
|
/** The tracer object for the debug logger. */
|
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
|
private DN TEST_ROOT_DN;
|
private DN EXAMPLE_DN;
|
/** The replicationServer that will be used in this test. */
|
private ReplicationServer replicationServer;
|
|
/** The port of the replicationServer. */
|
private int replicationServerPort;
|
|
private CSN firstCSNServer1;
|
private CSN secondCSNServer1;
|
private CSN firstCSNServer2;
|
private CSN secondCSNServer2;
|
|
private CSN unknownCSNServer1;
|
|
/** Set up the environment for performing the tests in this Class. */
|
@BeforeClass
|
@Override
|
public void setUp() throws Exception
|
{
|
super.setUp();
|
TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
|
EXAMPLE_DN = DN.valueOf("ou=example," + TEST_ROOT_DN_STRING);
|
|
// This test suite depends on having the schema available.
|
configure();
|
}
|
|
/** Start the server and configure a replicationServer. */
|
private void configure() throws Exception
|
{
|
replicationServerPort = TestCaseUtils.findFreePort();
|
|
TestCaseUtils.dsconfig(
|
"create-replication-server",
|
"--provider-name", "Multimaster Synchronization",
|
"--set", "replication-db-directory:" + "replicationServerTestConfigureDb",
|
"--set", "replication-port:" + replicationServerPort,
|
"--set", "replication-server-id:71");
|
|
for (SynchronizationProvider<?> provider : DirectoryServer
|
.getSynchronizationProviders()) {
|
if (provider instanceof MultimasterReplication) {
|
MultimasterReplication mmp = (MultimasterReplication) provider;
|
ReplicationServerListener list = mmp.getReplicationServerListener();
|
if (list != null) {
|
replicationServer = list.getReplicationServer();
|
if (replicationServer != null) {
|
break;
|
}
|
}
|
}
|
}
|
assertNotNull(replicationServer);
|
}
|
|
private void debugInfo(String s)
|
{
|
//logger.error(LocalizableMessage.raw("** TEST ** " + s));
|
if (logger.isTraceEnabled())
|
{
|
logger.trace("** TEST ** " + s);
|
}
|
}
|
|
/**
|
* The tests in this class only works in a specific order.
|
* This method is used to make sure that this order is always respected.
|
* (Using testng dependency does not work)
|
*/
|
@Test(enabled = true)
|
public void replicationServerTest() throws Exception
|
{
|
ReplicationBroker[] brokers1And2 = null;
|
try {
|
brokers1And2 = changelogBasic();
|
newClientLateServer1();
|
newClient();
|
newClientWithFirstChanges();
|
newClientWithChangefromServer1();
|
newClientWithChangefromServer2();
|
newClientWithUnknownChanges();
|
}
|
finally
|
{
|
stop(brokers1And2);
|
}
|
stopChangelog();
|
}
|
|
/**
|
* This test allows to check the behavior of the Replication Server
|
* when the DS disconnect and reconnect again.
|
* In order to stress the protocol in such case, connection and
|
* disconnection is done inside an infinite loop and therefore this
|
* test is disabled and should only be enabled in workspaces but never
|
* committed in the repository.
|
*/
|
@Test(enabled=false)
|
public void replicationServerTestLoop() throws Exception
|
{
|
ReplicationBroker[] brokers1And2 = null;
|
try
|
{
|
brokers1And2 = changelogBasic();
|
while (true)
|
{
|
newClient();
|
}
|
}
|
finally
|
{
|
stop(brokers1And2);
|
}
|
}
|
|
/** Create two brokers: open for each a sender session and a receiver session to the replicationServer. */
|
private ReplicationBroker[] createReplicationBrokers1And2() throws Exception {
|
return new ReplicationBroker[] {
|
openReplicationSession(TEST_ROOT_DN, 1, 100, replicationServerPort, 1000),
|
openReplicationSession(TEST_ROOT_DN, 2, 100, replicationServerPort, 1000)
|
};
|
}
|
|
@Override
|
protected ReplicationBroker openReplicationSession(final DN baseDN,
|
int serverId, int windowSize, int port, int timeout) throws Exception
|
{
|
ReplicationBroker broker = super.openReplicationSession(baseDN, serverId, windowSize, port, timeout);
|
assertTrue(broker.isConnected());
|
return broker;
|
}
|
|
/**
|
* Basic test of the replicationServer code : Connect 2 clients to the
|
* replicationServer and exchange messages between the clients. Note : Other
|
* tests in this file depends on this test and may need to change if this test
|
* is modified.
|
*
|
* @return the two brokers created to simulate 2 DS sending and receiving
|
* messages. They are returned to allow other tests to close the
|
* brokers themselves.
|
*/
|
private ReplicationBroker[] changelogBasic() throws Exception
|
{
|
clearChangelogDB(replicationServer);
|
debugInfo("Starting changelogBasic");
|
ReplicationBroker[] brokers1And2 = createReplicationBrokers1And2();
|
ReplicationBroker server1 = brokers1And2[0];
|
ReplicationBroker server2 = brokers1And2[1];
|
|
/*
|
* Create CSNs for the messages sent from server 1 with current time
|
* sequence 1 and with current time + 2 sequence 2
|
*/
|
long time = TimeThread.getTime();
|
firstCSNServer1 = new CSN(time, 1, 1);
|
secondCSNServer1 = new CSN(time + 2, 2, 1);
|
|
/*
|
* Create CSNs for the messages sent from server 2 with current time
|
* sequence 1 and with current time + 3 sequence 2
|
*/
|
firstCSNServer2 = new CSN(time + 1, 1, 2);
|
secondCSNServer2 = new CSN(time + 3, 2, 2);
|
|
/*
|
* Create a CSN between firstCSNServer1 and secondCSNServer1 that will not
|
* be used to create a change sent to the replicationServer but that will
|
* be used in the Server State when opening a connection to the
|
* ReplicationServer to make sure that the ReplicationServer is able to
|
* accept such clients.
|
*/
|
unknownCSNServer1 = new CSN(time + 1, 1, 1);
|
|
sendAndReceiveDeleteMsg(server1, server2, EXAMPLE_DN, firstCSNServer1, "uid");
|
sendAndReceiveDeleteMsg(server1, server2, TEST_ROOT_DN, secondCSNServer1, "uid");
|
|
// Send and receive a Delete Msg from server 2 to server 1
|
sendAndReceiveDeleteMsg(server2, server1, EXAMPLE_DN, firstCSNServer2, "other-uid");
|
sendAndReceiveDeleteMsg(server2, server1, TEST_ROOT_DN, secondCSNServer2, "uid");
|
|
debugInfo("Ending changelogBasic");
|
return new ReplicationBroker[] { server1, server2 };
|
}
|
|
private void sendAndReceiveDeleteMsg(ReplicationBroker sender, ReplicationBroker receiver,
|
DN dn, CSN csn, String entryUUID) throws Exception
|
{
|
DeleteMsg sentMsg = new DeleteMsg(dn, csn, entryUUID);
|
sender.publish(sentMsg);
|
ReplicationMsg receivedMsg = receiver.receive();
|
receiver.updateWindowAfterReplay();
|
assertDeleteMsgBodyEquals(sentMsg, receivedMsg);
|
}
|
|
private void assertDeleteMsgBodyEquals(DeleteMsg sentMsg, ReplicationMsg receivedMsg)
|
{
|
Assertions.assertThat(receivedMsg).isInstanceOf(DeleteMsg.class);
|
assertEquals(receivedMsg.toString(), sentMsg.toString(),
|
"ReplicationServer basic : incorrect message body received. CSN is same as \""
|
+ getCSNFieldName(((DeleteMsg) receivedMsg).getCSN()) + "\" field.");
|
}
|
|
private String getCSNFieldName(CSN csn)
|
{
|
if (csn == null) {
|
return "";
|
}
|
else if (csn.equals(firstCSNServer1))
|
{
|
return "firstCSNServer1";
|
}
|
else if (csn.equals(secondCSNServer1))
|
{
|
return "secondCSNServer1";
|
}
|
else if (csn.equals(firstCSNServer2))
|
{
|
return "firstCSNServer2";
|
}
|
else if (csn.equals(secondCSNServer2))
|
{
|
return "secondCSNServer2";
|
}
|
else if (csn.equals(unknownCSNServer1))
|
{
|
return "unknownCSNServer1";
|
}
|
return null;
|
}
|
|
private ServerState newServerState(CSN... csns)
|
{
|
ServerState state = new ServerState();
|
for (CSN csn : csns)
|
{
|
state.update(csn);
|
}
|
return state;
|
}
|
|
/** Test that a new client see the change that was sent in the previous test. */
|
private void newClient() throws Exception
|
{
|
debugInfo("Starting newClient");
|
ReplicationBroker broker = null;
|
|
try {
|
broker = openReplicationSession(TEST_ROOT_DN, 3, 100, replicationServerPort, 1000);
|
|
ReplicationMsg receivedMsg = broker.receive();
|
broker.updateWindowAfterReplay();
|
assertDeleteMsgCSNEquals(receivedMsg, firstCSNServer1, "first");
|
debugInfo("Ending newClient");
|
}
|
finally
|
{
|
stop(broker);
|
}
|
}
|
|
/** Test that a client that has already seen some changes now receive the correct next change. */
|
private void newClientWithChanges(ServerState state, CSN nextCSN) throws Exception
|
{
|
ReplicationBroker broker = null;
|
|
// Connect to the replicationServer using the state created above.
|
try {
|
final long generationId = getGenerationId(TEST_ROOT_DN);
|
broker = new ReplicationBroker(new DummyReplicationDomain(generationId),
|
state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort),
|
getReplSessionSecurity());
|
connect(broker, 5000);
|
|
ReplicationMsg receivedMsg = broker.receive();
|
broker.updateWindowAfterReplay();
|
assertDeleteMsgCSNEquals(receivedMsg, nextCSN, "second");
|
}
|
finally
|
{
|
stop(broker);
|
}
|
}
|
|
/** Asserts that the CSN for the passed in message matches the supplied CSN. */
|
private void assertDeleteMsgCSNEquals(ReplicationMsg msg, CSN nextCSN, String msgNumber)
|
{
|
Assertions.assertThat(msg).isInstanceOf(DeleteMsg.class);
|
DeleteMsg del = (DeleteMsg) msg;
|
assertEquals(del.getCSN(), nextCSN, "The " + msgNumber
|
+ " message received by a new client was the wrong one.");
|
}
|
|
/** Test that a client that has already seen the first change now see the second change. */
|
private void newClientWithFirstChanges() throws Exception
|
{
|
debugInfo("Starting newClientWithFirstChanges");
|
/*
|
* Create a ServerState updated with the first changes from both servers
|
* done in test changelogBasic.
|
*/
|
ServerState state = newServerState(firstCSNServer1, firstCSNServer2);
|
newClientWithChanges(state, secondCSNServer1);
|
debugInfo("Ending newClientWithFirstChanges");
|
}
|
|
/** Test with a client that has already seen a Change that the ReplicationServer has not seen. */
|
private void newClientWithUnknownChanges() throws Exception
|
{
|
debugInfo("Starting newClientWithUnknownChanges");
|
ServerState state = newServerState(unknownCSNServer1, secondCSNServer2);
|
newClientWithChanges(state, secondCSNServer1);
|
debugInfo("Ending newClientWithUnknownChanges");
|
}
|
|
/**
|
* Test that a client that has already seen the first change from server 1
|
* now see the first change from server 2.
|
*/
|
private void newClientWithChangefromServer1() throws Exception
|
{
|
debugInfo("Starting newClientWithChangefromServer1");
|
ServerState state = newServerState(firstCSNServer1);
|
newClientWithChanges(state, firstCSNServer2);
|
debugInfo("Ending newClientWithChangefromServer1");
|
}
|
|
/**
|
* Test that a client that has already seen the first change from server 2
|
* now see the first change from server 1.
|
*/
|
private void newClientWithChangefromServer2() throws Exception
|
{
|
debugInfo("Starting newClientWithChangefromServer2");
|
ServerState state = newServerState(firstCSNServer2);
|
newClientWithChanges(state, firstCSNServer1);
|
debugInfo("Ending newClientWithChangefromServer2");
|
}
|
|
/** Test that a client that has not seen the second change from server 1 now receive it. */
|
private void newClientLateServer1() throws Exception
|
{
|
debugInfo("Starting newClientLateServer1");
|
ServerState state = newServerState(secondCSNServer2, firstCSNServer1);
|
newClientWithChanges(state, secondCSNServer1);
|
debugInfo("Ending newClientLateServer1");
|
}
|
|
/**
|
* Test that newClient() and newClientWithFirstChange() still works
|
* after stopping and restarting the replicationServer.
|
*/
|
private void stopChangelog() throws Exception
|
{
|
ReplicationBroker[] brokers1And2 = null;
|
try {
|
debugInfo("Starting stopChangelog");
|
shutdown();
|
configure();
|
brokers1And2 = createReplicationBrokers1And2();
|
newClient();
|
newClientWithFirstChanges();
|
newClientWithChangefromServer1();
|
newClientWithChangefromServer2();
|
debugInfo("Ending stopChangelog");
|
}
|
finally {
|
stop(brokers1And2);
|
}
|
}
|
|
/**
|
* Stress test from client using the ReplicationBroker API
|
* to the replicationServer.
|
* This test allow to investigate the behaviour of the
|
* ReplicationServer when it needs to distribute the load of
|
* updates from a single LDAP server to a number of LDAP servers.
|
*
|
* This test is configured by a relatively low stress
|
* but can be changed using TOTAL_MSG and CLIENT_THREADS consts.
|
*/
|
@Test(enabled = true)
|
public void oneWriterMultipleReader() throws Exception
|
{
|
debugInfo("Starting oneWriterMultipleReader");
|
|
clearChangelogDB(replicationServer);
|
TestCaseUtils.initializeTestBackend(true);
|
|
ReplicationBroker server = null;
|
BrokerReader reader = null;
|
int TOTAL_MSG = 1000; // number of messages to send during the test
|
int CLIENT_THREADS = 2; // number of threads that will try to read
|
// the messages
|
CSNGenerator gen = new CSNGenerator(5 , 0);
|
|
BrokerReader client[] = new BrokerReader[CLIENT_THREADS];
|
ReplicationBroker clientBroker[] = new ReplicationBroker[CLIENT_THREADS];
|
|
try
|
{
|
/* Open a sender session */
|
server = openReplicationSession(TEST_ROOT_DN, 5, 100, replicationServerPort, 100000);
|
reader = new BrokerReader(server, TOTAL_MSG);
|
|
/* Start the client threads. */
|
for (int i =0; i< CLIENT_THREADS; i++)
|
{
|
clientBroker[i] = openReplicationSession(TEST_ROOT_DN, 100+i, 100, replicationServerPort, 1000);
|
client[i] = new BrokerReader(clientBroker[i], TOTAL_MSG);
|
}
|
|
for (BrokerReader c : client)
|
{
|
c.start();
|
}
|
reader.start();
|
|
/* Simple loop creating changes and sending them to the replicationServer. */
|
for (int i = 0; i< TOTAL_MSG; i++)
|
{
|
server.publish(new DeleteMsg(EXAMPLE_DN, gen.newCSN(), "uid"));
|
}
|
debugInfo("Ending oneWriterMultipleReader");
|
}
|
finally
|
{
|
if (reader != null)
|
{
|
reader.join(10000);
|
}
|
stop(server);
|
join(client);
|
stop(clientBroker);
|
|
if (reader != null)
|
{
|
assertNull(reader.errDetails, reader.exc + " " + reader.errDetails);
|
}
|
}
|
}
|
|
/**
|
* Stress test from client using the ReplicationBroker API
|
* to the replicationServer.
|
*
|
* This test allow to investigate the behavior of the
|
* ReplicationServer when it needs to distribute the load of
|
* updates from multiple LDAP server to a number of LDAP servers.
|
*
|
* This test is configured for a relatively low stress
|
* but can be changed using TOTAL_MSG and THREADS consts.
|
*/
|
@Test(enabled = true, groups = "opendj-256")
|
public void multipleWriterMultipleReader() throws Exception
|
{
|
debugInfo("Starting multipleWriterMultipleReader");
|
final int TOTAL_MSG = 1000; // number of messages to send during the test
|
final int THREADS = 2; // number of threads that will produce
|
// and read the messages.
|
|
BrokerWriter producer[] = new BrokerWriter[THREADS];
|
BrokerReader reader[] = new BrokerReader[THREADS];
|
ReplicationBroker broker[] = new ReplicationBroker[THREADS];
|
|
clearChangelogDB(replicationServer);
|
TestCaseUtils.initializeTestBackend(true);
|
|
try
|
{
|
/* Start the producer threads. */
|
for (int i = 0; i< THREADS; i++)
|
{
|
int serverId = 10 + i;
|
CSNGenerator gen = new CSNGenerator(serverId , 0);
|
broker[i] = openReplicationSession(TEST_ROOT_DN,
|
serverId, 100, replicationServerPort, 3000);
|
|
producer[i] = new BrokerWriter(broker[i], gen, TOTAL_MSG/THREADS);
|
reader[i] = new BrokerReader(broker[i], (TOTAL_MSG/THREADS)*(THREADS-1));
|
}
|
|
for (BrokerWriter p : producer)
|
{
|
p.start();
|
}
|
|
for (BrokerReader r : reader)
|
{
|
r.start();
|
}
|
debugInfo("multipleWriterMultipleReader produces and readers started");
|
}
|
finally
|
{
|
debugInfo("multipleWriterMultipleReader wait producers end");
|
join(producer);
|
debugInfo("multipleWriterMultipleReader producers ended, now wait readers end");
|
join(reader);
|
debugInfo("multipleWriterMultipleReader reader's ended, now stop brokers");
|
stop(broker);
|
debugInfo("multipleWriterMultipleReader brokers stopped");
|
|
for (BrokerReader r : reader)
|
{
|
if (r != null)
|
{
|
assertNull(r.errDetails, r.exc + " " + r.errDetails);
|
}
|
}
|
}
|
debugInfo("Ending multipleWriterMultipleReader");
|
}
|
|
private void join(Thread[] threads) throws InterruptedException
|
{
|
for (Thread t : threads)
|
{
|
if (t != null)
|
{
|
t.join(10000);
|
// kill the thread in case it is not yet stopped.
|
t.interrupt();
|
}
|
}
|
}
|
|
/**
|
* <ol>
|
* <li>Create replication server 1</li>
|
* <li>Create replication server 2 connected with replication server 1</li>
|
* <li>Create and connect client 1 to replication server 1</li>
|
* <li>Create and connect client 2 to replication server 2</li>
|
* <li>Make client1 publish changes</li>
|
* <li>Check that client 2 receives the changes published by client 1</li>
|
* </ol>.
|
*/
|
@Test(enabled = true)
|
public void changelogChaining0() throws Exception
|
{
|
final String tn = "changelogChaining0";
|
debugInfo("Starting " + tn);
|
clearChangelogDB(replicationServer);
|
TestCaseUtils.initializeTestBackend(true);
|
|
{
|
ReplicationBroker broker2 = null;
|
|
// - Create 2 connected replicationServer
|
ReplicationServer[] changelogs = new ReplicationServer[2];
|
int[] changelogPorts = TestCaseUtils.findFreePorts(2);
|
int[] changelogIds = new int[] { 80, 81 };
|
int[] brokerIds = new int[] { 100, 101 };
|
|
// create the 2 connected replicationServer
|
for (int i = 0; i < 2; i++)
|
{
|
changelogs[i] = newReplicationServer(changelogPorts, changelogIds, i);
|
}
|
|
ReplicationBroker broker1 = null;
|
|
try
|
{
|
// create and connect client1 to changelog1 and client2 to changelog2
|
broker1 = openReplicationSession(TEST_ROOT_DN, brokerIds[0], 100, changelogPorts[0], 1000);
|
broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[0], 1000);
|
|
// - Test messages between clients by publishing now
|
CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
|
|
// - Delete
|
DN dn = DN.valueOf("o=example" + 0 + "," + TEST_ROOT_DN_STRING);
|
DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
|
broker1.publish(delMsg);
|
|
// - Add
|
AddMsg addMsg = addMsg(csnGen);
|
broker1.publish(addMsg);
|
|
// - Modify
|
ModifyMsg modMsg = modMsg(csnGen);
|
broker1.publish(modMsg);
|
|
// - ModifyDN
|
ModifyDNMsg modDNMsg = modDNMsg(csnGen);
|
broker1.publish(modDNMsg);
|
|
// - Check msg receives by broker, through changeLog2
|
List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
|
Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
|
debugInfo("Ending " + tn);
|
}
|
finally
|
{
|
remove(changelogs);
|
stop(broker1, broker2);
|
}
|
}
|
}
|
|
/**
|
* <ol>
|
* <li>Create replication server 1</li>
|
* <li>Create and connect client1 to replication server 1</li>
|
* <li>Make client1 publish changes</li>
|
* <li>Create replication server 2 connected with replication server 1</li>
|
* <li>Create and connect client 2 to replication server 2</li>
|
* <li>Check that client 2 receives the changes published by client 1</li>
|
* <ol>
|
*/
|
@Test(enabled = true)
|
public void changelogChaining1() throws Exception
|
{
|
final String tn = "changelogChaining1";
|
debugInfo("Starting " + tn);
|
clearChangelogDB(replicationServer);
|
TestCaseUtils.initializeTestBackend(true);
|
|
{
|
ReplicationBroker broker2 = null;
|
|
// - Create 2 connected replicationServer
|
ReplicationServer[] changelogs = new ReplicationServer[2];
|
int[] changelogPorts = TestCaseUtils.findFreePorts(2);
|
int[] changelogIds = new int[] { 80, 81 };
|
int[] brokerIds = new int[] { 100, 101 };
|
|
// create the 1rst replicationServer, the second one will be created later
|
changelogs[0] = newReplicationServer(changelogPorts, changelogIds, 0);
|
|
ReplicationBroker broker1 = null;
|
|
try
|
{
|
// only create and connect client1 to changelog1 client2 will be created later
|
broker1 = openReplicationSession(TEST_ROOT_DN, brokerIds[0], 100, changelogPorts[0], 1000);
|
|
// - Test messages between clients by publishing now
|
CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
|
|
// - Delete
|
DN dn = DN.valueOf("o=example" + 1 + "," + TEST_ROOT_DN_STRING);
|
DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
|
broker1.publish(delMsg);
|
|
// - Add
|
AddMsg addMsg = addMsg(csnGen);
|
broker1.publish(addMsg);
|
|
// - Modify
|
ModifyMsg modMsg = modMsg(csnGen);
|
broker1.publish(modMsg);
|
|
// - ModifyDN
|
ModifyDNMsg modDNMsg = modDNMsg(csnGen);
|
broker1.publish(modDNMsg);
|
|
changelogs[1] = newReplicationServer(changelogPorts, changelogIds, 1);
|
|
// Connect broker 2 to changelog2
|
broker2 = openReplicationSession(TEST_ROOT_DN,
|
brokerIds[1], 100, changelogPorts[1], 2000);
|
|
// - Check msg receives by broker, through changeLog2
|
List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
|
Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
|
debugInfo("Ending " + tn);
|
}
|
finally
|
{
|
remove(changelogs);
|
stop(broker1, broker2);
|
}
|
}
|
}
|
|
private ReplicationServer newReplicationServer(int[] changelogPorts, int[] changelogIds, int i)
|
throws ConfigException
|
{
|
SortedSet<String> servers = newTreeSet("localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
|
ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
|
changelogPorts[i], "replicationServerTestChangelogChainingDb"+i,
|
0, changelogIds[i], 0, 100, servers);
|
return new ReplicationServer(conf);
|
}
|
|
private ModifyDNMsg modDNMsg(CSNGenerator csnGen) throws DirectoryException
|
{
|
ModifyDNOperationBasis op = new ModifyDNOperationBasis(
|
connection, 1, 1, null, EXAMPLE_DN, RDN.valueOf("o=example2"), true, null);
|
op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
|
LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
|
return new ModifyDNMsg(localOp);
|
}
|
|
private AddMsg addMsg(CSNGenerator csnGen) throws Exception
|
{
|
String user1entryUUID = "33333333-3333-3333-3333-333333333333";
|
String baseUUID = "22222222-2222-2222-2222-222222222222";
|
|
Entry entry = TestCaseUtils.makeEntry(
|
"dn: o=example," + TEST_ROOT_DN_STRING,
|
"objectClass: top",
|
"objectClass: domain",
|
"entryUUID: " + user1entryUUID);
|
return new AddMsg(csnGen.newCSN(), EXAMPLE_DN, user1entryUUID, baseUUID,
|
entry.getObjectClassAttribute(), entry.getAllAttributes(), null);
|
}
|
|
private List<ReplicationMsg> receiveReplicationMsgs(ReplicationBroker broker2, int nbMessagesExpected)
|
{
|
List<ReplicationMsg> msgs = new ArrayList<>(nbMessagesExpected);
|
for (int i = 0; i < nbMessagesExpected; i++)
|
{
|
try
|
{
|
ReplicationMsg msg = broker2.receive();
|
if (msg == null)
|
{
|
break;
|
}
|
if (msg instanceof TopologyMsg)
|
{
|
continue; // ignore
|
}
|
msgs.add(msg);
|
|
broker2.updateWindowAfterReplay();
|
}
|
catch (SocketTimeoutException e)
|
{
|
fail("Broker receive failed: " + e.getMessage() + "#Msg:" + i);
|
}
|
}
|
return msgs;
|
}
|
|
/** Test that the Replication sends back correctly WindowsUpdate when we send a WindowProbeMsg. */
|
@Test(enabled = true)
|
public void windowProbeTest() throws Exception
|
{
|
debugInfo("Starting windowProbeTest");
|
final int WINDOW = 10;
|
|
clearChangelogDB(replicationServer);
|
TestCaseUtils.initializeTestBackend(true);
|
|
/*
|
* Open a session to the replication server.
|
*
|
* Some other tests may have been running before and therefore
|
* may have pushed some changes to the Replication Server
|
* When a new session is opened, the Replication Server is therefore
|
* going to send all these old changes to this Replication Server.
|
* To avoid this, this test open a first session, save the
|
* ServerState from the ReplicationServer, close the session
|
* and re-open a new connection providing the ServerState it just
|
* received from the first session.
|
* This should guarantee that old changes are not perturbing this test.
|
*/
|
|
// open the first session to the replication server
|
InetSocketAddress serverAddr =
|
new HostPort("localhost", replicationServerPort).toInetSocketAddress();
|
Socket socket = new Socket();
|
socket.setReceiveBufferSize(1000000);
|
socket.setTcpNoDelay(true);
|
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
|
socket.connect(serverAddr, timeoutMS);
|
ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
|
Session session = replSessionSecurity.createClientSession(socket, timeoutMS);
|
|
boolean sslEncryption = DirectoryConfig.getCryptoManager().isSslEncryption();
|
|
try
|
{
|
// send a ServerStartMsg with an empty ServerState.
|
String url = socket.getLocalAddress().getCanonicalHostName() + ":"
|
+ socket.getLocalPort();
|
ServerStartMsg msg = new ServerStartMsg(1723, url, TEST_ROOT_DN,
|
WINDOW, 5000, new ServerState(), 0, sslEncryption, (byte)-1);
|
session.publish(msg);
|
|
// Read the Replication Server state from the ReplServerStartDSMsg that
|
// comes back.
|
ReplServerStartDSMsg replStartDSMsg = (ReplServerStartDSMsg) session.receive();
|
int serverwindow = replStartDSMsg.getWindowSize();
|
if (!sslEncryption)
|
{
|
session.stopEncryption();
|
}
|
|
// Send StartSessionMsg
|
StartSessionMsg startSessionMsg = new StartSessionMsg(
|
ServerStatus.NORMAL_STATUS, new ArrayList<String>(),
|
false, AssuredMode.SAFE_DATA_MODE, (byte) 1);
|
session.publish(startSessionMsg);
|
|
// Read the TopologyMsg that should come back.
|
ReplicationMsg repMsg = session.receive();
|
Assertions.assertThat(repMsg).isInstanceOf(TopologyMsg.class);
|
|
// Now comes the real test : check that the Replication Server
|
// answers correctly to a WindowProbeMsg LocalizableMessage.
|
session.publish(new WindowProbeMsg());
|
|
WindowMsg windowMsg = waitForSpecificMsg(session, WindowMsg.class);
|
assertEquals(serverwindow, windowMsg.getNumAck());
|
|
// check that this did not change the window by sending a probe again.
|
session.publish(new WindowProbeMsg());
|
|
// We may receive some MonitoringMsg so use filter method
|
windowMsg = waitForSpecificMsg(session, WindowMsg.class);
|
assertEquals(serverwindow, windowMsg.getNumAck());
|
debugInfo("Ending windowProbeTest");
|
}
|
finally
|
{
|
session.close();
|
}
|
}
|
|
/**
|
* Clean up the environment.
|
*
|
* @throws Exception If the environment could not be set up.
|
*/
|
@AfterClass
|
@Override
|
public void classCleanUp() throws Exception
|
{
|
callParanoiaCheck = false;
|
super.classCleanUp();
|
|
replicationServer.getChangelogDB().removeDB();
|
shutdown();
|
|
paranoiaCheck();
|
}
|
|
/** After the tests stop the replicationServer. */
|
private void shutdown() throws Exception
|
{
|
TestCaseUtils.dsconfig(
|
"delete-replication-server",
|
"--provider-name", "Multimaster Synchronization");
|
replicationServer = null;
|
}
|
|
/**
|
* This class allows to create reader thread.
|
* They continuously reads messages from a replication broker until
|
* there is nothing left.
|
* They Count the number of received messages.
|
*/
|
private class BrokerReader extends Thread
|
{
|
private ReplicationBroker broker;
|
private int numMsgRcv;
|
private final int numMsgExpected;
|
private Exception exc;
|
private String errDetails;
|
|
/** Creates a new Stress Test Reader. */
|
public BrokerReader(ReplicationBroker broker, int numMsgExpected)
|
{
|
this.broker = broker;
|
this.numMsgExpected = numMsgExpected;
|
}
|
|
@Override
|
public void run()
|
{
|
debugInfo("BrokerReader " + broker.getServerId() + " starts");
|
|
// loop receiving messages until either we get a timeout
|
// because there is nothing left or an error condition happens.
|
try
|
{
|
while (true)
|
{
|
ReplicationMsg msg = broker.receive();
|
if (msg instanceof UpdateMsg)
|
{
|
numMsgRcv++;
|
broker.updateWindowAfterReplay();
|
}
|
// if ((msg == null) || (numMsgRcv >= numMsgExpected))
|
// Terminating this thread when the nb of msg received is reached
|
// may prevent to process a WindowMsg that would unblock the dual
|
// writer thread.
|
if (msg == null)
|
{
|
break;
|
}
|
}
|
} catch (SocketTimeoutException e)
|
{
|
if (numMsgRcv != numMsgExpected)
|
{
|
this.exc = e;
|
this.errDetails =
|
"BrokerReader " + broker.getServerId()
|
+ " did not received the expected message number : act="
|
+ numMsgRcv + " exp=" + numMsgExpected;
|
}
|
} catch (Exception e)
|
{
|
this.exc = e;
|
this.errDetails =
|
"a BrokerReader received an Exception" + e.getMessage()
|
+ stackTraceToSingleLineString(e);
|
}
|
}
|
}
|
|
/**
|
* This class allows to create writer thread that can
|
* be used as producers for the ReplicationServer stress tests.
|
*/
|
private class BrokerWriter extends Thread
|
{
|
private int count;
|
private ReplicationBroker broker;
|
private CSNGenerator gen;
|
|
public BrokerWriter(ReplicationBroker broker, CSNGenerator gen, int count)
|
{
|
this.broker = broker;
|
this.count = count;
|
this.gen = gen;
|
}
|
|
@Override
|
public void run()
|
{
|
debugInfo("writer " + broker.getServerId() + " starts to produce " + count);
|
int ccount = count;
|
/* Simple loop creating changes and sending them to the replicationServer. */
|
while (count>0)
|
{
|
count--;
|
|
DeleteMsg msg = new DeleteMsg(EXAMPLE_DN, gen.newCSN(), "uid");
|
broker.publish(msg);
|
|
if ((count % 10) == 0)
|
{
|
debugInfo("writer " + broker.getServerId() + " to send="+count);
|
}
|
}
|
debugInfo("writer " + broker.getServerId() + " ends sent="+ccount);
|
}
|
}
|
|
/**
|
* Replication Server configuration test of the replication Server code with
|
* 2 replication servers involved
|
*
|
* Test 1
|
* - Create replication server 1
|
* - Create replication server 2
|
* - Connect replication server 1 to replication server 2
|
* - Create and connect client 1 to replication server 1
|
* - Create and connect client 2 to replication server 2
|
* - Make client1 publish changes
|
* - Check that client 2 receives the changes published by client 1
|
* Then
|
* - Change the config of replication server 1 to no more be connected
|
* to server 2
|
* - Make client 1 publish a change
|
* - Check that client 2 does not receive the change.
|
*/
|
@Test(enabled = true, groups = "opendj-256")
|
public void replicationServerConnected() throws Exception
|
{
|
clearChangelogDB(replicationServer);
|
TestCaseUtils.initializeTestBackend(true);
|
|
debugInfo("Starting replicationServerConnected");
|
ReplicationBroker broker1 = null;
|
ReplicationBroker broker2 = null;
|
|
// - Create 2 connected replicationServer
|
ReplicationServer[] changelogs = new ReplicationServer[2];
|
int[] changelogPorts = TestCaseUtils.findFreePorts(2);
|
int[] changelogIds = new int[] { 90, 91 };
|
int[] brokerIds = new int[] { 100, 101 };
|
|
for (int i = 0; i < 2; i++)
|
{
|
changelogs[i] = null;
|
// create the 2 replicationServer
|
// and connect the first one to the other one
|
SortedSet<String> servers = new TreeSet<>();
|
|
// Connect only replicationServer[0] to ReplicationServer[1]
|
// and not the other way
|
if (i==0)
|
{
|
servers.add("localhost:" + changelogPorts[1]);
|
}
|
ReplServerFakeConfiguration conf = newReplServerConfiguration(changelogPorts, changelogIds, servers, i,
|
"replicationServerTestReplicationServerConnectedDb"+i);
|
changelogs[i] = new ReplicationServer(conf);
|
}
|
|
try
|
{
|
// Create and connect client1 to changelog1
|
// and client2 to changelog2
|
broker1 = openReplicationSession(TEST_ROOT_DN, brokerIds[0], 100, changelogPorts[0], 1000);
|
broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[1], 1000);
|
|
// - Test messages between clients by publishing now
|
CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
|
|
// - Add
|
AddMsg addMsg = addMsg(csnGen);
|
broker1.publish(addMsg);
|
|
// - Modify
|
ModifyMsg modMsg = modMsg(csnGen);
|
broker1.publish(modMsg);
|
|
// - Check msg received by broker, through changeLog2
|
List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 2);
|
Assertions.assertThat(msgs).containsExactly(addMsg, modMsg);
|
|
// Then change the config to remove replicationServer[1] from
|
// the configuration of replicationServer[0]
|
|
SortedSet<String> servers = new TreeSet<>();
|
// Configure replicationServer[0] to be disconnected from ReplicationServer[1]
|
ReplServerFakeConfiguration conf =
|
newReplServerConfiguration(changelogPorts, changelogIds, servers, 0, "changelogDb0");
|
changelogs[0].applyConfigurationChange(conf);
|
|
// The link between RS[0] & RS[1] should be destroyed by the new configuration.
|
// So we expect a timeout exception when calling receive on RS[1].
|
// Send an update and check that RS[1] does not receive the message after the timeout
|
|
// - Del
|
DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), addMsg.getEntryUUID());
|
broker1.publish(delMsg);
|
// Should receive some TopologyMsg messages for disconnection between the 2 RSs
|
assertOnlyTopologyMsgsReceived(broker2);
|
}
|
finally
|
{
|
remove(changelogs);
|
stop(broker1, broker2);
|
}
|
}
|
|
private ReplServerFakeConfiguration newReplServerConfiguration(int[] changelogPorts, int[] changelogIds,
|
SortedSet<String> servers, int i, String dirName)
|
{
|
return new ReplServerFakeConfiguration(changelogPorts[i], dirName, 0, changelogIds[i], 0, 100, servers);
|
}
|
|
|
private ModifyMsg modMsg(CSNGenerator csnGen)
|
{
|
List<Modification> mods = Arrays.asList(new Modification(REPLACE, Attributes.create("description", "new value")));
|
return new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
|
}
|
|
private void assertOnlyTopologyMsgsReceived(ReplicationBroker broker2)
|
{
|
try
|
{
|
while (true)
|
{
|
ReplicationMsg msg = broker2.receive();
|
if (msg instanceof TopologyMsg)
|
{
|
debugInfo("Broker 2 received: " + msg);
|
}
|
else
|
{
|
fail("Broker: receive successed when it should fail. "
|
+ "This broker was disconnected by configuration."
|
+ " Received: " + msg);
|
}
|
}
|
}
|
catch (SocketTimeoutException expected)
|
{
|
debugInfo("Ending replicationServerConnected");
|
}
|
}
|
}
|