/*
|
* The contents of this file are subject to the terms of the Common Development and
|
* Distribution License (the License). You may not use this file except in compliance with the
|
* License.
|
*
|
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
|
* specific language governing permission and limitations under the License.
|
*
|
* When distributing Covered Software, include this CDDL Header Notice in each file and include
|
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
|
* Header, with the fields enclosed by brackets [] replaced by your own identifying
|
* information: "Portions Copyright [year] [name of copyright owner]".
|
*
|
* Copyright 2009-2010 Sun Microsystems, Inc.
|
* Portions Copyright 2011-2016 ForgeRock AS.
|
*/
|
package org.opends.server.replication.plugin;
|
|
import static java.util.concurrent.TimeUnit.*;
|
|
import static org.mockito.Mockito.*;
|
import static org.opends.server.util.CollectionUtils.*;
|
import static org.testng.Assert.*;
|
|
import java.io.IOException;
|
import java.net.SocketTimeoutException;
|
import java.util.SortedSet;
|
import java.util.TreeSet;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import org.forgerock.i18n.LocalizableMessage;
|
import org.forgerock.i18n.slf4j.LocalizedLogger;
|
import org.forgerock.opendj.config.server.ConfigurationChangeListener;
|
import org.forgerock.opendj.ldap.DN;
|
import org.forgerock.opendj.server.config.server.ReplicationSynchronizationProviderCfg;
|
import org.forgerock.opendj.server.config.server.SynchronizationProviderCfg;
|
import org.opends.server.TestCaseUtils;
|
import org.opends.server.api.SynchronizationProvider;
|
import org.opends.server.core.DirectoryServer;
|
import org.opends.server.replication.ReplicationTestCase;
|
import org.opends.server.replication.common.CSNGenerator;
|
import org.opends.server.replication.common.DSInfo;
|
import org.opends.server.replication.common.ServerState;
|
import org.opends.server.replication.common.ServerStatus;
|
import org.opends.server.replication.protocol.AddMsg;
|
import org.opends.server.replication.protocol.DoneMsg;
|
import org.opends.server.replication.protocol.EntryMsg;
|
import org.opends.server.replication.protocol.InitializeTargetMsg;
|
import org.opends.server.replication.protocol.ReplSessionSecurity;
|
import org.opends.server.replication.protocol.ReplicationMsg;
|
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
|
import org.opends.server.replication.protocol.RoutableMsg;
|
import org.opends.server.replication.server.ReplServerFakeConfiguration;
|
import org.opends.server.replication.server.ReplicationServer;
|
import org.opends.server.replication.service.ReplicationBroker;
|
import org.opends.server.types.Entry;
|
import org.opends.server.util.TestTimer;
|
import org.opends.server.util.TestTimer.CallableVoid;
|
import org.testng.annotations.AfterClass;
|
import org.testng.annotations.BeforeClass;
|
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.Test;
|
|
/**
|
* Some tests to go through the DS state machine and validate we get the
|
* expected status according to the actions we perform.
|
*/
|
@SuppressWarnings("javadoc")
|
public class StateMachineTest extends ReplicationTestCase
|
{
|
/** Server id definitions. */
|
private static final String EXAMPLE_DN = "dc=example,dc=com";
|
private static DN EXAMPLE_DN_;
|
|
private static final int DS1_ID = 1;
|
private static final int DS2_ID = 2;
|
private static final int DS3_ID = 3;
|
private static final int RS1_ID = 41;
|
private int rs1Port = -1;
|
private LDAPReplicationDomain ds1;
|
private ReplicationBroker ds2;
|
private ReplicationBroker ds3;
|
private ReplicationServer rs1;
|
/** The tracer object for the debug logger. */
|
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
|
private final int initWindow = 100;
|
|
private void debugInfo(String s)
|
{
|
logger.error(LocalizableMessage.raw(s));
|
if (logger.isTraceEnabled())
|
{
|
logger.trace("** TEST **" + s);
|
}
|
}
|
|
private static void shutdown(BrokerReader reader)
|
{
|
if (reader != null)
|
{
|
reader.shutdown();
|
}
|
}
|
|
private static void shutdown(BrokerWriter writer)
|
{
|
if (writer != null)
|
{
|
writer.shutdown();
|
}
|
}
|
|
private void initTest() throws IOException
|
{
|
rs1Port = -1;
|
ds1 = null;
|
ds2 = null;
|
ds3 = null;
|
rs1Port = TestCaseUtils.findFreePort();
|
}
|
|
private void endTest() throws Exception
|
{
|
if (ds1 != null)
|
{
|
ds1.shutdown();
|
ds1 = null;
|
}
|
|
// Clear any reference to a domain in synchro plugin
|
MultimasterReplication.deleteDomain(EXAMPLE_DN_);
|
stop(ds2, ds3);
|
ds2 = ds3 = null;
|
remove(rs1);
|
rs1 = null;
|
rs1Port = -1;
|
}
|
|
/** Waits until the provided ds is connected to the replication server. */
|
private void waitUntiConnected(final int dsId) throws Exception
|
{
|
TestTimer timer = new TestTimer.Builder()
|
.maxSleep(30, SECONDS)
|
.sleepTimes(100, MILLISECONDS)
|
.toTimer();
|
timer.repeatUntilSuccess(new CallableVoid()
|
{
|
@Override
|
public void call() throws Exception
|
{
|
assertTrue(isConnected(dsId), "checkConnection: DS " + dsId + " is not connected to the RS");
|
}
|
});
|
}
|
|
private boolean isConnected(final int dsId)
|
{
|
switch (dsId)
|
{
|
case DS1_ID:
|
return ds1.isConnected();
|
case DS2_ID:
|
return ds2.isConnected();
|
case DS3_ID:
|
return ds3.isConnected();
|
default:
|
fail("Unknown ds server id.");
|
return false;
|
}
|
}
|
|
/** Creates a new ReplicationServer. */
|
private ReplicationServer createReplicationServer(String testCase,
|
int degradedStatusThreshold) throws Exception
|
{
|
SortedSet<String> replServers = new TreeSet<>();
|
|
String dir = "stateMachineTest" + RS1_ID + testCase + "Db";
|
ReplServerFakeConfiguration conf =
|
new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID, 0,
|
100, replServers, 1, 1000, degradedStatusThreshold);
|
return new ReplicationServer(conf);
|
}
|
|
/** Creates and starts a new ReplicationDomain configured for the replication server. */
|
@SuppressWarnings("unchecked")
|
private LDAPReplicationDomain createReplicationDomain(int dsId) throws Exception
|
{
|
SortedSet<String> replServers = new TreeSet<>();
|
replServers.add("localhost:" + rs1Port);
|
|
DomainFakeCfg domainConf = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers);
|
LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf);
|
replicationDomain.start();
|
SynchronizationProvider<SynchronizationProviderCfg> provider =
|
DirectoryServer.getSynchronizationProviders().get(0);
|
if (provider instanceof ConfigurationChangeListener)
|
{
|
ConfigurationChangeListener<ReplicationSynchronizationProviderCfg> mmr =
|
(ConfigurationChangeListener<ReplicationSynchronizationProviderCfg>) provider;
|
mmr.applyConfigurationChange(mock(ReplicationSynchronizationProviderCfg.class));
|
}
|
|
return replicationDomain;
|
}
|
|
/**
|
* Create and connect a replication broker to the replication server with the
|
* given state and generation id.
|
*/
|
private ReplicationBroker createReplicationBroker(int dsId,
|
ServerState state, long generationId) throws Exception
|
{
|
SortedSet<String> replServers = newTreeSet("localhost:" + rs1Port);
|
DomainFakeCfg fakeCfg = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers);
|
fakeCfg.setHeartbeatInterval(0);
|
fakeCfg.setChangetimeHeartbeatInterval(500);
|
ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
|
ReplicationBroker broker = new ReplicationBroker(
|
new DummyReplicationDomain(generationId), state, fakeCfg, security);
|
broker.start();
|
checkConnection(30, broker);
|
return broker;
|
}
|
|
/**
|
* Make simple state machine test.
|
*
|
* NC = Not connected status
|
* N = Normal status
|
* D = Degraded status
|
* FU = Full update status
|
* BG = Bad generation id status
|
*
|
* The test path should be:
|
* ->NC->N->NC
|
* @throws Exception If a problem occurred
|
*/
|
@Test(enabled=true)
|
public void testStateMachineBasic() throws Exception
|
{
|
String testCase = "testStateMachineBasic";
|
|
debugInfo("Starting " + testCase);
|
|
initTest();
|
|
try
|
{
|
// DS1 start, no RS available: DS1 should be in not connected status
|
ds1 = createReplicationDomain(DS1_ID);
|
waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
|
|
// RS1 starts , DS1 should connect to it and be in normal status
|
rs1 = createReplicationServer(testCase, 5000);
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
// RS1 stops, DS1 should go in not connected status
|
rs1.remove();
|
waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
|
} finally
|
{
|
endTest();
|
}
|
}
|
|
/** Returns various init values for test testStateMachineStatusAnalyzer. */
|
@DataProvider(name="stateMachineStatusAnalyzerTestProvider")
|
public Object [][] stateMachineStatusAnalyzerTestProvider() throws Exception
|
{
|
return new Object [][] { {1} , {10}, {50}, {120} };
|
}
|
|
/**
|
* Test the status analyzer system that allows to go from normal to degraded
|
* and vice versa, using the configured threshold value
|
*
|
* NC = Not connected status
|
* N = Normal status
|
* D = Degraded status
|
* FU = Full update status
|
* BG = Bad generation id status
|
*
|
* Expected path:
|
* ->NC->N->D->N->NC
|
* @throws Exception If a problem occurred
|
*/
|
@Test(enabled=true, groups="slow", dataProvider="stateMachineStatusAnalyzerTestProvider")
|
public void testStateMachineStatusAnalyzer(int thresholdValue) throws Throwable
|
{
|
String testCase = "testStateMachineStatusAnalyzer with threhold " + thresholdValue;
|
|
debugInfo("Starting " + testCase + " with " + thresholdValue);
|
|
initTest();
|
|
BrokerReader br3 = null;
|
BrokerReader br2 = null;
|
BrokerWriter bw = null;
|
|
try
|
{
|
/* RS1 starts with specified threshold value */
|
rs1 = createReplicationServer(testCase, thresholdValue);
|
|
/*
|
* DS2 starts and connects to RS1. No reader and low window value at the beginning
|
* so writer for DS2 in RS should enqueue changes after first changes sent to DS.
|
* (window value reached: a window msg needed by RS for following sending changes to DS)
|
*/
|
ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
|
waitUntiConnected(DS2_ID);
|
|
/* DS3 starts and connects to RS1 */
|
ds3 = createReplicationBroker(DS3_ID, new ServerState(), EMPTY_DN_GENID);
|
br3 = new BrokerReader(ds3, DS3_ID);
|
waitUntiConnected(DS3_ID);
|
|
// Send first changes to reach window and block DS2 writer queue. Writer will take them
|
// from queue and block (no more changes removed from writer queue) after
|
// having sent them to TCP receive queue of DS2.
|
bw = new BrokerWriter(ds3, DS3_ID, false);
|
bw.followAndPause(11);
|
|
/*
|
* DS3 sends changes (less than threshold): DS2 should still be in normal status
|
* so no topo message should be sent (update topo message for telling status of DS2 changed)
|
*/
|
int nChangesSent = 0;
|
if (thresholdValue > 1)
|
{
|
nChangesSent = thresholdValue - 1;
|
bw.followAndPause(nChangesSent);
|
Thread.sleep(1000); // Be sure status analyzer has time to test
|
ReplicationMsg msg = br3.getLastMsg();
|
debugInfo(testCase + " Step 1: last message from writer: " + msg);
|
assertNull(msg, (msg != null) ? msg.toString() : "null");
|
}
|
|
/*
|
* DS3 sends changes to reach the threshold value,
|
* DS3 should receive an update topo message with status of DS2: degraded status
|
*/
|
bw.followAndPause(thresholdValue - nChangesSent);
|
// wait for a status MSG status analyzer to broker 3
|
waitForDegradedStatusOnBroker3();
|
|
/*
|
* DS3 sends 10 additional changes after threshold value,
|
* DS2 should still be degraded so no topo message received.
|
*/
|
bw.followAndPause(10);
|
shutdown(bw);
|
Thread.sleep(1000); // Be sure status analyzer has time to test
|
ReplicationMsg lastMsg = br3.getLastMsg();
|
ReplicationMsg msg = br3.getLastMsg();
|
debugInfo(testCase + " Step 3: last message from writer: " + msg);
|
assertNull(lastMsg);
|
|
/*
|
* DS2 replays every changes and should go back to normal status
|
* (create a reader to emulate replay of messages (messages read from queue))
|
*/
|
br2 = new BrokerReader(ds2, DS2_ID);
|
// wait for a status MSG status analyzer to broker 3
|
waitForDegradedStatusOnBroker3();
|
} finally
|
{
|
endTest();
|
shutdown(bw);
|
shutdown(br3);
|
shutdown(br2);
|
}
|
}
|
|
private void waitForDegradedStatusOnBroker3() throws InterruptedException
|
{
|
for (int count = 0; count< 50; count++)
|
{
|
DSInfo dsInfo = ds3.getReplicaInfos().get(DS2_ID);
|
if (dsInfo != null && dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS)
|
{
|
break;
|
}
|
|
assertTrue(count < 50, "DS2 did not get degraded : " + dsInfo);
|
Thread.sleep(200); // Be sure status analyzer has time to test
|
}
|
}
|
|
/**
|
* Go through the possible state machine transitions:
|
*
|
* NC = Not connected status
|
* N = Normal status
|
* D = Degraded status
|
* FU = Full update status
|
* BG = Bad generation id status
|
*
|
* The test path should be:
|
* ->NC->D->N->NC->N->D->NC->D->N->BG->NC->N->D->BG->FU->NC->N->D->FU->NC->BG->NC->N->FU->NC->N->NC
|
* @throws Exception If a problem occurred
|
*/
|
@Test(enabled = false, groups = "slow")
|
public void testStateMachineFull() throws Exception
|
{
|
String testCase = "testStateMachineFull";
|
|
debugInfo("Starting " + testCase);
|
|
initTest();
|
BrokerReader br = null;
|
BrokerWriter bw = null;
|
|
try
|
{
|
int DEGRADED_STATUS_THRESHOLD = 1;
|
|
// RS1 starts with 1 message as degraded status threshold value
|
rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
|
|
// DS2 starts and connects to RS1
|
ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
|
br = new BrokerReader(ds2, DS2_ID);
|
waitUntiConnected(DS2_ID);
|
|
// DS2 starts sending a lot of changes
|
bw = new BrokerWriter(ds2, DS2_ID, false);
|
bw.follow();
|
Thread.sleep(1000); // Let some messages being queued in RS
|
|
/*
|
* DS1 starts and connects to RS1, server state exchange should lead to start in degraded status
|
* as some changes should be in queued in the RS and the threshold value is 1 change in queue.
|
*/
|
ds1 = createReplicationDomain(DS1_ID);
|
waitUntiConnected(DS1_ID);
|
waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
|
|
/* DS2 stops sending changes: DS1 should replay pending changes and should enter the normal status */
|
bw.pause();
|
// Sleep enough so that replay can be done and analyzer has time
|
// to see that the queue length is now under the threshold value.
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/* RS1 stops to make DS1 go to not connected status (from normal status) */
|
rs1.remove();
|
waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
|
|
/*
|
* DS2 restarts with up to date server state
|
* (this allows to have restarting RS1 not sending him some updates he already sent)
|
*/
|
ds2.stop();
|
shutdown(bw);
|
shutdown(br);
|
ServerState curState = ds1.getServerState();
|
ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
|
br = new BrokerReader(ds2, DS2_ID);
|
|
/* RS1 restarts, DS1 should get back to normal status */
|
rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
|
waitUntiConnected(DS2_ID);
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/* DS2 sends again a lot of changes to make DS1 degraded again */
|
bw = new BrokerWriter(ds2, DS2_ID, false);
|
bw.follow();
|
Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
|
waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
|
|
/* RS1 stops to make DS1 go to not connected status (from degraded status) */
|
rs1.remove();
|
bw.pause();
|
waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
|
|
/*
|
* DS2 restarts with up to date server state
|
* (this allows to have restarting RS1 not sending him some updates he already sent)
|
*/
|
ds2.stop();
|
shutdown(bw);
|
shutdown(br);
|
curState = ds1.getServerState();
|
ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
|
br = new BrokerReader(ds2, DS2_ID);
|
|
/*
|
* RS1 restarts, DS1 should reconnect in degraded status
|
* (from not connected this time, not from state machine entry)
|
*/
|
rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
|
// It is too difficult to tune the right sleep so disabling this test:
|
// Sometimes the status analyzer may be fast and quickly change the status
|
// of DS1 to NORMAL_STATUS
|
//sleep(2000);
|
//sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
|
waitUntiConnected(DS2_ID);
|
|
/* DS1 should come back in normal status after a while */
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/*
|
* DS2 sends a reset gen id order with wrong gen id:
|
* DS1 should go into bad generation id status
|
*/
|
long BAD_GEN_ID = 999999L;
|
resetGenId(ds2, BAD_GEN_ID); // ds2 will also go bad gen
|
waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
|
|
/*
|
* DS2 sends again a reset gen id order with right id: DS1 should be disconnected by RS
|
* then reconnect and enter again in normal status. This goes through not connected status
|
* but not possible to check as should reconnect immediately
|
*/
|
resetGenId(ds2, EMPTY_DN_GENID); // ds2 will also be disconnected
|
ds2.stop();
|
shutdown(br); // Reader could reconnect broker, but gen id would be bad: need to recreate a
|
// broker to send changex
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/* DS2 sends again a lot of changes to make DS1 degraded again */
|
curState = ds1.getServerState();
|
ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
|
waitUntiConnected(DS2_ID);
|
bw = new BrokerWriter(ds2, DS2_ID, false);
|
br = new BrokerReader(ds2, DS2_ID);
|
bw.follow();
|
Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
|
waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
|
|
/*
|
* DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id status
|
* (from degraded status this time)
|
*/
|
// -1 to allow next step full update and flush RS db so that DS1 can reconnect after full
|
// update
|
resetGenId(ds2, -1);
|
waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
|
bw.pause();
|
|
/*
|
* DS2 engages full update (while DS1 in bad gen id status),
|
* DS1 should go in full update status
|
*/
|
BrokerInitializer bi = new BrokerInitializer(ds2, DS2_ID, false);
|
bi.initFullUpdate(DS1_ID, 200);
|
waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
|
|
/*
|
* DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
|
* and come back to normal status (RS genid was -1 so RS will adopt new gen id)
|
*/
|
bi.runFullUpdate();
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/* DS2 sends changes to DS1: DS1 should go in degraded status */
|
ds2.stop(); // will need a new broker with another gen id restart it
|
shutdown(bw);
|
shutdown(br);
|
long newGen = ds1.getGenerationID();
|
curState = ds1.getServerState();
|
ds2 = createReplicationBroker(DS2_ID, curState, newGen);
|
waitUntiConnected(DS2_ID);
|
bw = new BrokerWriter(ds2, DS2_ID, false);
|
br = new BrokerReader(ds2, DS2_ID);
|
bw.follow();
|
Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
|
waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
|
|
/* DS2 engages full update (while DS1 in degraded status), DS1 should go in full update status */
|
bi = new BrokerInitializer(ds2, DS2_ID, false);
|
bi.initFullUpdate(DS1_ID, 300);
|
waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
|
bw.pause();
|
|
/*
|
* DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
|
* and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200)
|
*/
|
bi.runFullUpdate();
|
waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
|
|
/*
|
* DS2 sends reset gen id with gen id same as DS1:
|
* DS1 will be disconnected by RS (not connected status) and come back to normal status
|
*/
|
ds2.stop(); // will need a new broker with another gen id restart it
|
shutdown(bw);
|
shutdown(br);
|
newGen = ds1.getGenerationID();
|
curState = ds1.getServerState();
|
ds2 = createReplicationBroker(DS2_ID, curState, newGen);
|
waitUntiConnected(DS2_ID);
|
br = new BrokerReader(ds2, DS2_ID);
|
resetGenId(ds2, newGen); // Make DS1 reconnect in normal status
|
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/* DS2 engages full update (while DS1 in normal status), DS1 should go in full update status */
|
bi = new BrokerInitializer(ds2, DS2_ID, false);
|
bi.initFullUpdate(DS1_ID, 300); // 300 entries will compute same genid of the RS
|
waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
|
|
/*
|
* DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
|
* and come back to normal status (process full update with same data as before so RS already
|
* has right gen id: version with 300 entries)
|
*/
|
bi.runFullUpdate();
|
ds2.stop();
|
shutdown(br);
|
waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
|
|
/* RS1 stops, DS1 should go to not connected status */
|
rs1.remove();
|
waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
|
} finally
|
{
|
// Finalize test
|
endTest();
|
shutdown(bw);
|
shutdown(br);
|
}
|
}
|
|
/**
|
* Set up the environment.
|
*
|
* @throws Exception
|
* If the environment could not be set up.
|
*/
|
@BeforeClass
|
@Override
|
public void setUp() throws Exception
|
{
|
super.setUp();
|
EXAMPLE_DN_ = DN.valueOf(EXAMPLE_DN);
|
|
// Note: this test does not use the memory test backend as for having a DS
|
// going into degraded status, we need to send a lot of updates. This makes
|
// the memory test backend crash with OutOfMemoryError. So we prefer here
|
// a backend backed up with a file
|
TestCaseUtils.clearBackend("userRoot");
|
}
|
|
/**
|
* Clean up the environment.
|
*
|
* @throws Exception If the environment could not be set up.
|
*/
|
@AfterClass
|
@Override
|
public void classCleanUp() throws Exception
|
{
|
callParanoiaCheck = false;
|
super.classCleanUp();
|
|
TestCaseUtils.clearBackend("userRoot");
|
|
paranoiaCheck();
|
}
|
|
/**
|
* Sends a reset genid message through the given replication broker, with the
|
* given new generation id.
|
*/
|
private void resetGenId(ReplicationBroker rb, long newGenId)
|
{
|
ResetGenerationIdMsg resetMsg = new ResetGenerationIdMsg(newGenId);
|
rb.publish(resetMsg);
|
}
|
|
/**
|
* Utility class for making a full update through a broker. No separated thread
|
* Usage:
|
* BrokerInitializer bi = new BrokerInitializer(rb, sid, nEntries);
|
* bi.initFullUpdate(); // Initializes a full update session by sending InitializeTargetMsg
|
* bi.runFullUpdate(); // loops sending nEntries entries and finalizes the full update by sending the EntryDoneMsg
|
*/
|
private class BrokerInitializer
|
{
|
private ReplicationBroker rb;
|
private int serverId = -1;
|
private long userId;
|
/** Server id of server to initialize. */
|
private int destId = -1;
|
/** Number of entries to send to dest. */
|
private long nEntries = -1;
|
private boolean createReader;
|
|
/**
|
* If the BrokerInitializer is to be used for a lot of entries to send
|
* (which is often the case), the reader thread should be enabled to make
|
* the window subsystem work and allow the broker to send as much entries as
|
* he wants. If not enabled, the user is responsible to call the receive
|
* method of the broker himself.
|
*/
|
private BrokerReader reader;
|
|
/** Creates a broker initializer. Also creates a reader according to request */
|
public BrokerInitializer(ReplicationBroker rb, int serverId,
|
boolean createReader)
|
{
|
this.rb = rb;
|
this.serverId = serverId;
|
this.createReader = createReader;
|
}
|
|
/** Initializes a full update session by sending InitializeTargetMsg. */
|
public void initFullUpdate(int destId, long nEntries)
|
{
|
// Also create reader ?
|
if (createReader)
|
{
|
reader = new BrokerReader(rb, serverId);
|
}
|
|
debugInfo("Broker " + serverId + " initializer sending InitializeTargetMsg to server " + destId);
|
|
this.destId = destId;
|
this.nEntries = nEntries;
|
|
// Send init msg to warn dest server it is going do be initialized
|
RoutableMsg initTargetMsg = new InitializeTargetMsg(
|
EXAMPLE_DN_, serverId, destId, serverId, nEntries, initWindow);
|
rb.publish(initTargetMsg);
|
|
// Send top entry for the domain
|
String topEntry = "dn: " + EXAMPLE_DN + "\n"
|
+ "objectClass: top\n"
|
+ "objectClass: domain\n"
|
+ "dc: example\n"
|
+ "entryUUID: 11111111-1111-1111-1111-111111111111\n\n";
|
EntryMsg entryMsg = new EntryMsg(serverId, destId, topEntry.getBytes(), 1);
|
rb.publish(entryMsg);
|
}
|
|
private EntryMsg createNextEntryMsg()
|
{
|
String userEntryUUID = "11111111-1111-1111-1111-111111111111";
|
long curId = ++userId;
|
String userdn = "uid=full_update_user" + curId + "," + EXAMPLE_DN;
|
String entryWithUUIDldif = "dn: " + userdn + "\n" + "objectClass: top\n" +
|
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
|
"objectClass: inetOrgPerson\n" +
|
"uid: full_update_user" + curId + "\n" +
|
"homePhone: 951-245-7634\n" +
|
"description: This is the description for Aaccf Amar.\n" + "st: NC\n" +
|
"mobile: 027-085-0537\n" +
|
"postalAddress: Aaccf Amar$17984 Thirteenth Street" +
|
"$Rockford, NC 85762\n" + "mail: user.1@example.com\n" +
|
"cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" +
|
"street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" +
|
"employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" +
|
"postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" +
|
"entryUUID: " + userEntryUUID + "\n\n";
|
// -> WARNING: EntryMsg PDUs are concatenated before calling import on LDIF
|
// file so need \n\n to separate LDIF entries to conform to LDIF file format
|
|
// Create an entry message
|
return new EntryMsg(serverId, destId, entryWithUUIDldif.getBytes(),
|
(int) userId);
|
}
|
|
/**
|
* Loops sending entries for full update (EntryMsg messages). When
|
* terminates, sends the EntryDoneMsg to finalize full update. Number of
|
* sent entries is determined at initFullUpdate call time.
|
*/
|
public void runFullUpdate()
|
{
|
debugInfo("Broker " + serverId + " initializer starting sending entries to server " + destId);
|
|
for(long i = 0 ; i<nEntries ; i++) {
|
EntryMsg entryMsg = createNextEntryMsg();
|
rb.publish(entryMsg);
|
}
|
|
debugInfo("Broker " + serverId + " initializer stopping sending entries");
|
|
debugInfo("Broker " + serverId + " initializer sending EntryDoneMsg");
|
DoneMsg doneMsg = new DoneMsg(serverId, destId);
|
rb.publish(doneMsg);
|
|
if (createReader)
|
{
|
shutdown(reader);
|
}
|
|
debugInfo("Broker " + serverId + " initializer thread is dying");
|
}
|
}
|
|
/** Thread for sending a lot of changes through a broker. */
|
private class BrokerWriter extends Thread
|
{
|
private ReplicationBroker rb;
|
private int serverId = -1;
|
private long userId;
|
private AtomicBoolean shutdown = new AtomicBoolean(false);
|
/** The writer starts suspended. */
|
private AtomicBoolean suspended = new AtomicBoolean(true);
|
/**
|
* Tells a sending session is finished. A session is sending messages
|
* between the follow and the pause calls, or the time a followAndPause
|
* method runs.
|
*/
|
private AtomicBoolean sessionDone = new AtomicBoolean(true);
|
private boolean careAboutAmountOfChanges;
|
/** Number of sent changes. */
|
private int nChangesSent;
|
private int nChangesSentLimit;
|
private CSNGenerator gen;
|
private Object sleeper = new Object();
|
/**
|
* If the BrokerWriter is to be used for a lot of changes to send (which is
|
* often the case), the reader thread should be enabled to make the window
|
* subsystem work and allow the broker to send as much changes as he wants.
|
* If not enabled, the user is responsible to call the receive method of
|
* the broker himself.
|
*/
|
private BrokerReader reader;
|
|
/** Creates a broker writer. Also creates a reader according to request */
|
public BrokerWriter(ReplicationBroker rb, int serverId,
|
boolean createReader)
|
{
|
super("BrokerWriter for broker " + serverId);
|
this.rb = rb;
|
this.serverId = serverId;
|
// Create a csn generator to generate new csns
|
// when we need to send changes
|
gen = new CSNGenerator(serverId, 0);
|
|
// Start thread (is paused by default so will have to call follow anyway)
|
start();
|
|
// Also create reader ?
|
if (createReader)
|
{
|
reader = new BrokerReader(rb, serverId);
|
}
|
}
|
|
/**
|
* Loops sending changes: add operations creating users with different ids
|
* This starts paused and has to be resumed calling a follow method.
|
*/
|
@Override
|
public void run()
|
{
|
boolean dbg1Written = false, dbg2Written;
|
// No stop msg when entering the loop (thread starts with paused writer)
|
dbg2Written = true;
|
while (!shutdown.get())
|
{
|
long startSessionTime = -1;
|
boolean startedNewSession = false;
|
// When not in pause, loop sending changes to RS
|
while (!suspended.get())
|
{
|
startedNewSession = true;
|
if (!dbg1Written)
|
{
|
startSessionTime = System.currentTimeMillis();
|
debugInfo("Broker " + serverId +
|
" writer starting sending changes session at: " + startSessionTime);
|
dbg1Written = true;
|
dbg2Written = false;
|
}
|
AddMsg addMsg = createNextAddMsg();
|
rb.publish(addMsg);
|
// End session if amount of changes sent has been requested
|
if (careAboutAmountOfChanges)
|
{
|
nChangesSent++;
|
if (nChangesSent == nChangesSentLimit)
|
{
|
// Requested number of changes to send sent, end session
|
debugInfo("Broker " + serverId + " writer reached " +
|
nChangesSent + " changes limit");
|
suspended.set(true);
|
break;
|
}
|
}
|
}
|
if (!dbg2Written)
|
{
|
long endSessionTime = System.currentTimeMillis();
|
debugInfo("Broker " + serverId +
|
" writer stopping sending changes session at: " + endSessionTime +
|
" (duration: " + (endSessionTime - startSessionTime) + " ms)");
|
dbg1Written = false;
|
dbg2Written = true;
|
}
|
// Mark session is finished
|
if (startedNewSession)
|
{
|
sessionDone.set(true);
|
}
|
|
try
|
{
|
// Writer in pause, sleep a while to let other threads work
|
synchronized(sleeper)
|
{
|
sleeper.wait(1000);
|
}
|
} catch (InterruptedException ex)
|
{
|
/* Don't care */
|
}
|
}
|
debugInfo("Broker " + serverId + " writer thread is dying");
|
}
|
|
/** Stops the writer thread. */
|
public void shutdown()
|
{
|
suspended.set(true); // If were working
|
shutdown.set(true);
|
synchronized (sleeper)
|
{
|
sleeper.notify();
|
}
|
try
|
{
|
join();
|
} catch (InterruptedException ex)
|
{
|
/* Don't care */
|
}
|
|
// Stop reader if any
|
StateMachineTest.shutdown(reader);
|
}
|
|
/** Suspends the writer thread. */
|
public void pause()
|
{
|
if (isPaused())
|
{
|
return; // Already suspended
|
}
|
suspended.set(true);
|
// Wait for all messages sent
|
while (!sessionDone.get())
|
{
|
TestCaseUtils.sleep(200);
|
}
|
}
|
|
/** Test if the writer is suspended. */
|
public boolean isPaused()
|
{
|
return sessionDone.get();
|
}
|
|
/** Resumes the writer thread until it is paused. */
|
public void follow()
|
{
|
sessionDone.set(false);
|
suspended.set(false);
|
}
|
|
/**
|
* Resumes the writer and suspends it after a given amount of changes has been
|
* sent. If the writer was working it will be paused anyway after the given
|
* amount of changes, starting from the current call time.
|
* -> blocking call
|
*/
|
public void followAndPause(int nChanges)
|
{
|
debugInfo("Requested broker writer " + serverId + " to write " + nChanges + " change(s).");
|
pause(); // If however we were already working
|
|
// Initialize counter system variables
|
nChangesSent = 0;
|
nChangesSentLimit = nChanges;
|
careAboutAmountOfChanges = true;
|
|
// Start session
|
sessionDone.set(false);
|
suspended.set(false);
|
|
// Wait for all messages sent
|
while (!sessionDone.get())
|
{
|
TestCaseUtils.sleep(1000);
|
}
|
careAboutAmountOfChanges = false;
|
}
|
|
private AddMsg createNextAddMsg()
|
{
|
String userEntryUUID = "11111111-1111-1111-1111-111111111111";
|
long curId = userId++;
|
String userdn = "uid=user" + curId + "," + EXAMPLE_DN;
|
Entry personWithUUIDEntry = null;
|
try
|
{
|
// @Formatter:off
|
personWithUUIDEntry = TestCaseUtils.makeEntry(
|
"dn: " + userdn,
|
"objectClass: top",
|
"objectClass: person",
|
"objectClass: organizationalPerson",
|
"objectClass: inetOrgPerson",
|
"uid: user" + curId + "",
|
"homePhone: 951-245-7634",
|
"description: This is the description for Aaccf Amar.",
|
"st: NC",
|
"mobile: 027-085-0537",
|
"postalAddress: Aaccf Amar$17984 Thirteenth Street$Rockford, NC 85762",
|
"mail: user.1@example.com",
|
"cn: Aaccf Amar",
|
"l: Rockford",
|
"pager: 508-763-4246",
|
"street: 17984 Thirteenth Street",
|
"telephoneNumber: 216-564-6748",
|
"employeeNumber: 1",
|
"sn: Amar",
|
"givenName: Aaccf",
|
"postalCode: 85762",
|
"userPassword: password",
|
"initials: AA",
|
"entryUUID: " + userEntryUUID);
|
// @Formatter:on
|
} catch (Exception e)
|
{
|
throw new RuntimeException(e);
|
}
|
|
// Create an update message to add an entry.
|
return new AddMsg(gen.newCSN(),
|
personWithUUIDEntry.getName(),
|
userEntryUUID,
|
null,
|
personWithUUIDEntry.getObjectClassAttribute(),
|
personWithUUIDEntry.getAllAttributes(), null);
|
}
|
}
|
|
/**
|
* This simple reader just throws away the received
|
* messages. It is used on a breaker we want to be able to send or read from some message
|
* with (changes, entries (full update)...). Calling the receive method of the
|
* broker allows to unblock the window mechanism and to send the desired messages.
|
* Calling the updateWindowAfterReplay method allows to send when necessary the
|
* window message to the RS to allow him send other messages he may want to send us.
|
*/
|
private class BrokerReader extends Thread
|
{
|
private ReplicationBroker rb;
|
private int serverId = -1;
|
private boolean shutdown;
|
private ReplicationMsg lastMsg;
|
|
public BrokerReader(ReplicationBroker rb, int serverId)
|
{
|
super("BrokerReader for broker " + serverId);
|
this.rb = rb;
|
this.serverId = serverId;
|
start();
|
}
|
|
/** Loop reading and throwing update messages. */
|
@Override
|
public void run()
|
{
|
while (!shutdown)
|
{
|
try
|
{
|
ReplicationMsg msg = rb.receive(); // Allow more messages to be sent by broker writer
|
rb.updateWindowAfterReplay(); // Allow RS to send more messages to broker
|
if (msg != null)
|
{
|
debugInfo("Broker " + serverId + " reader received: " + msg);
|
}
|
lastMsg = msg;
|
} catch (SocketTimeoutException ex)
|
{
|
if (shutdown)
|
{
|
return;
|
}
|
}
|
}
|
debugInfo("Broker " + serverId + " reader thread is dying");
|
}
|
|
/** Returns last received message from reader When read, last value is cleared. */
|
public ReplicationMsg getLastMsg()
|
{
|
ReplicationMsg toReturn = lastMsg;
|
lastMsg = null;
|
return toReturn;
|
}
|
|
/** Stops reader thread. */
|
public void shutdown()
|
{
|
shutdown = true;
|
|
try
|
{
|
join();
|
} catch (InterruptedException ex)
|
{
|
/* Don't care */
|
}
|
}
|
}
|
|
/**
|
* Waits until the domain status reaches the expected status.
|
* @param domain The domain whose status we want to test
|
* @param expectedStatus The expected domain status
|
*/
|
private void waitUntilStatusEquals(final LDAPReplicationDomain domain, final ServerStatus expectedStatus)
|
throws Exception
|
{
|
assertNotNull(domain);
|
assertNotNull(expectedStatus);
|
|
TestTimer timer = new TestTimer.Builder()
|
.maxSleep(30, SECONDS)
|
.sleepTimes(500, MILLISECONDS)
|
.toTimer();
|
timer.repeatUntilSuccess(new CallableVoid()
|
{
|
@Override
|
public void call() throws Exception
|
{
|
assertEquals(domain.getStatus(), expectedStatus);
|
}
|
});
|
}
|
}
|