/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
import java.lang.Thread.State;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.*;
import com.forgerock.opendj.util.Pair;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
* Test for ChangeNumberIndexer class. All dependencies to the changelog DB
* interfaces are mocked. The ChangeNumberIndexer class simulates what the RS
* does to compute a changeNumber. The tests setup various topologies with their
* replicas.
*
* All tests are written with this layout:
*
* - Initial setup where RS is stopped. Data are set into the changelog state
* DB, the replica DBs and the change number index DB.
* - Simulate RS startup by calling {@link #startCNIndexer(DN...)}. This will
* start the change number indexer thread that will start computing change
* numbers and inserting them in the change number index db.
* - Send events to the change number indexer thread by publishing update
* messages, sending heartbeat messages or replica offline messages.
*
*/
@SuppressWarnings("javadoc")
public class ChangeNumberIndexerTest extends DirectoryServerTestCase
{
private static final class ReplicatedUpdateMsg extends UpdateMsg
{
private final DN baseDN;
private final boolean emptyCursor;
public ReplicatedUpdateMsg(DN baseDN, CSN csn)
{
this(baseDN, csn, false);
}
public ReplicatedUpdateMsg(DN baseDN, CSN csn, boolean emptyCursor)
{
super(csn, null);
this.baseDN = baseDN;
this.emptyCursor = emptyCursor;
}
public DN getBaseDN()
{
return baseDN;
}
public boolean isEmptyCursor()
{
return emptyCursor;
}
/** {@inheritDoc} */
@Override
public String toString()
{
return "UpdateMsg("
+ "\"" + baseDN + " " + getCSN().getServerId() + "\""
+ ", csn=" + getCSN().toStringUI()
+ ")";
}
}
private static DN BASE_DN1;
private static DN BASE_DN2;
private static DN ADMIN_DATA_DN;
private static final int serverId1 = 101;
private static final int serverId2 = 102;
private static final int serverId3 = 103;
@Mock
private ChangelogDB changelogDB;
@Mock
private ChangeNumberIndexDB cnIndexDB;
@Mock
private ReplicationDomainDB domainDB;
private List eclEnabledDomains;
private MultiDomainDBCursor multiDomainCursor;
private Map, SequentialDBCursor> replicaDBCursors;
private Map domainDBCursors;
private ChangelogState initialState;
private Map domainNewestCSNs;
private ChangeNumberIndexer cnIndexer;
private MultiDomainServerState initialCookie;
@BeforeClass
public static void classSetup() throws Exception
{
TestCaseUtils.startFakeServer();
BASE_DN1 = DN.decode("dc=example,dc=com");
BASE_DN2 = DN.decode("dc=world,dc=company");
ADMIN_DATA_DN = DN.decode("cn=admin data");
}
@AfterClass
public static void classTearDown() throws Exception
{
TestCaseUtils.shutdownFakeServer();
}
@BeforeMethod
public void setup() throws Exception
{
MockitoAnnotations.initMocks(this);
multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
initialState = new ChangelogState();
initialCookie = new MultiDomainServerState();
replicaDBCursors = new HashMap, SequentialDBCursor>();
domainDBCursors = new HashMap();
domainNewestCSNs = new HashMap();
when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(multiDomainCursor);
}
@AfterMethod
public void tearDown() throws Exception
{
stopCNIndexer();
}
private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
@Test
public void emptyDBNoDS() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
startCNIndexer();
assertExternalChangelogContent();
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneDS() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
assertExternalChangelogContent(msg1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBOneDS() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
addReplica(BASE_DN1, serverId1);
setCNIndexDBInitialRecords(msg1);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
publishUpdateMsg(msg2);
assertExternalChangelogContent(msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSs() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer();
assertExternalChangelogContent();
// simulate messages received out of order
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2);
// do not start publishing to the changelog until we hear from serverId1
assertExternalChangelogContent();
publishUpdateMsg(msg1);
assertExternalChangelogContent(msg1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsDifferentDomains() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN2, serverId2);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
publishUpdateMsg(msg1, msg2);
assertExternalChangelogContent(msg1);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg3);
assertExternalChangelogContent(msg1, msg2);
}
/**
* This test tries to reproduce a very subtle implementation bug where:
*
* - the change number indexer has no more records to proceed, because all
* cursors are exhausted, so it calls wait()
-
*
- a new change Upd1 comes in for an exhausted cursor,
* medium consistency cannot move
-
*
- a new change Upd2 comes in for a cursor that is not already opened,
* medium consistency can move, so wake up the change number indexer
-
*
- on wake up, the change number indexer calls next(),
* advancing the CompositeDBCursor, which recycles the exhausted cursor,
* then calls next() on it, making it lose its change.
* CompositeDBCursor currentRecord == Upd1.
-
*
- on the next iteration of the loop in run(), a new cursor is created,
* triggering the creation of a new CompositeDBCursor => Upd1 is lost.
* CompositeDBCursor currentRecord == Upd2.
-
*
*/
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
assertExternalChangelogContent(msg1);
addReplica(BASE_DN1, serverId2);
sendHeartbeat(BASE_DN1, serverId2, 2);
assertExternalChangelogContent(msg1);
// publish change that will not trigger a wake up of change number indexer,
// but will make it open a cursor on next wake up
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2);
assertExternalChangelogContent(msg1);
// wake up change number indexer
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg3);
assertExternalChangelogContent(msg1, msg2);
sendHeartbeat(BASE_DN1, serverId2, 4);
// assert no changes have been lost
assertExternalChangelogContent(msg1, msg2, msg3);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBTwoDSs() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
setCNIndexDBInitialRecords(msg1, msg2);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
publishUpdateMsg(msg3, msg4);
assertExternalChangelogContent(msg3);
final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
publishUpdateMsg(msg5);
assertExternalChangelogContent(msg3);
final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
publishUpdateMsg(msg6);
assertExternalChangelogContent(msg3, msg4, msg5);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN1, serverId1, 2);
final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN1, serverId2, 3);
// simulate no messages received during some time for replica 2
publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
assertExternalChangelogContent(msg1Sid2, msg2Sid1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(ADMIN_DATA_DN, serverId1);
addReplica(BASE_DN1, serverId2);
addReplica(BASE_DN1, serverId3);
startCNIndexer();
assertExternalChangelogContent();
// cn=admin data will does not participate in the external changelog
// so it cannot add to it
final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1);
publishUpdateMsg(msg1);
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId3, 3);
publishUpdateMsg(msg2, msg3);
assertExternalChangelogContent(msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
assertExternalChangelogContent(msg1);
addReplica(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2);
assertExternalChangelogContent(msg1);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg3);
assertExternalChangelogContent(msg1, msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
addReplica(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2);
assertExternalChangelogContent(msg1);
sendHeartbeat(BASE_DN1, serverId1, 3);
assertExternalChangelogContent(msg1, msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg1, msg2);
assertExternalChangelogContent(msg1);
sendHeartbeat(BASE_DN1, serverId1, 3);
assertExternalChangelogContent(msg1, msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneGoingOffline() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg1, msg2);
assertExternalChangelogContent(msg1);
replicaOffline(BASE_DN1, serverId2, 3);
// MCP cannot move forward since no new updates from serverId1
assertExternalChangelogContent(msg1);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
publishUpdateMsg(msg4);
// MCP moves forward after receiving update from serverId1
// (last replica in the domain)
assertExternalChangelogContent(msg1, msg2, msg4);
// serverId2 comes online again
final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
publishUpdateMsg(msg5);
// MCP does not move until it knows what happens to serverId1
assertExternalChangelogContent(msg1, msg2, msg4);
sendHeartbeat(BASE_DN1, serverId1, 6);
// MCP moves forward
assertExternalChangelogContent(msg1, msg2, msg4, msg5);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2);
// MCP does not wait for temporarily offline serverId1
assertExternalChangelogContent(msg2);
// serverId1 is back online, wait for changes from serverId2
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg3);
assertExternalChangelogContent(msg2);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
publishUpdateMsg(msg4);
// MCP moves forward
assertExternalChangelogContent(msg2, msg3);
}
/**
* Scenario:
*
* - Replica 1 publishes one change
* - Replica 1 sends offline message
* - RS stops
* - RS starts
*
*/
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
startCNIndexer();
// blocked until we receive info for serverId2
assertExternalChangelogContent();
sendHeartbeat(BASE_DN1, serverId2, 3);
// MCP moves forward
assertExternalChangelogContent(msg1);
// do not wait for temporarily offline serverId1
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
publishUpdateMsg(msg4);
assertExternalChangelogContent(msg1, msg4);
// serverId1 is back online, wait for changes from serverId2
final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
publishUpdateMsg(msg5);
assertExternalChangelogContent(msg1, msg4);
final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
publishUpdateMsg(msg6);
// MCP moves forward
assertExternalChangelogContent(msg1, msg4, msg5);
}
/**
* Scenario:
*
* - Replica 1 sends offline message
* - Replica 1 starts
* - Replica 1 publishes one change
* - Replica 1 publishes a second change
* - RS stops
* - RS starts
*
*/
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg2, msg3);
startCNIndexer();
assertExternalChangelogContent();
// MCP moves forward because serverId1 is not really offline
// since we received a message from it newer than the offline replica msg
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
publishUpdateMsg(msg4);
assertExternalChangelogContent(msg2, msg3);
// back to normal operations
sendHeartbeat(BASE_DN1, serverId1, 5);
assertExternalChangelogContent(msg2, msg3, msg4);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneKilled() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer();
assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
// MCP cannot move forward: no news yet from serverId2
assertExternalChangelogContent();
sendHeartbeat(BASE_DN1, serverId2, 2);
// MCP moves forward: we know what serverId2 is at
assertExternalChangelogContent(msg1);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg3);
// MCP cannot move forward: serverId2 is the oldest CSN
assertExternalChangelogContent(msg1);
}
private void addReplica(DN baseDN, int serverId) throws Exception
{
final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
if (isECLEnabledDomain2(baseDN))
{
DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
if (domainDBCursor == null)
{
domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
domainDBCursors.put(baseDN, domainDBCursor);
multiDomainCursor.addDomain(baseDN, null);
when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(domainDBCursor);
}
domainDBCursor.addReplicaDB(serverId, null);
when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(replicaDBCursor);
}
when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
getDomainNewestCSNs(baseDN));
initialState.addServerIdToDomain(serverId, baseDN);
}
private ServerState getDomainNewestCSNs(final DN baseDN)
{
ServerState serverState = domainNewestCSNs.get(baseDN);
if (serverState == null)
{
serverState = new ServerState();
domainNewestCSNs.put(baseDN, serverState);
}
return serverState;
}
private void startCNIndexer()
{
cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
{
@Override
protected boolean isECLEnabledDomain(DN baseDN)
{
return isECLEnabledDomain2(baseDN);
}
};
cnIndexer.start();
waitForWaitingState(cnIndexer);
}
private boolean isECLEnabledDomain2(DN baseDN)
{
return eclEnabledDomains.contains(baseDN);
}
private void stopCNIndexer() throws Exception
{
if (cnIndexer != null)
{
cnIndexer.initiateShutdown();
cnIndexer.join();
cnIndexer = null;
}
}
private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
{
return new ReplicatedUpdateMsg(baseDN, new CSN(time, 0, serverId));
}
private ReplicatedUpdateMsg emptyCursor(DN baseDN, int serverId)
{
return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
}
private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
{
// Initialize the previous cookie that will be used to compare the records
// added to the CNIndexDB at the end of this test
for (int i = 0; i < msgs.length; i++)
{
ReplicatedUpdateMsg msg = msgs[i];
if (i + 1 == msgs.length)
{
final ReplicatedUpdateMsg newestMsg = msg;
final DN baseDN = newestMsg.getBaseDN();
final CSN csn = newestMsg.getCSN();
when(cnIndexDB.getNewestRecord()).thenReturn(
new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
final SequentialDBCursor cursor =
replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
cursor.add(newestMsg);
}
initialCookie.update(msg.getBaseDN(), msg.getCSN());
}
}
private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception
{
for (ReplicatedUpdateMsg msg : msgs)
{
final SequentialDBCursor cursor =
replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
if (msg.isEmptyCursor())
{
cursor.add(null);
}
else
{
cursor.add(msg);
}
}
for (ReplicatedUpdateMsg msg : msgs)
{
if (!msg.isEmptyCursor())
{
if (cnIndexer != null)
{
// indexer is running
cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
}
else
{
// we are only setting up initial state, update the domain newest CSNs
getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN());
}
}
}
waitForWaitingState(cnIndexer);
}
private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
{
cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
waitForWaitingState(cnIndexer);
}
private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
{
cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
waitForWaitingState(cnIndexer);
}
private void waitForWaitingState(final Thread t)
{
if (t == null)
{ // not started yet, do not wait
return;
}
State state = t.getState();
while (!state.equals(State.WAITING)
&& !state.equals(State.TIMED_WAITING)
&& !state.equals(State.TERMINATED))
{
Thread.yield();
state = t.getState();
}
assertThat(state).isIn(State.WAITING, State.TIMED_WAITING);
}
/**
* Asserts which records have been added to the CNIndexDB since starting the
* {@link ChangeNumberIndexer} thread.
*/
private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
throws Exception
{
final ArgumentCaptor arg =
ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
final List allValues = arg.getAllValues();
// clone initial state to avoid modifying it
final MultiDomainServerState previousCookie =
new MultiDomainServerState(initialCookie.toString());
// check it was not called more than expected
String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">";
assertThat(allValues).as(desc1).hasSize(expectedMsgs.length);
for (int i = 0; i < expectedMsgs.length; i++)
{
final ReplicatedUpdateMsg expectedMsg = expectedMsgs[i];
final ChangeNumberIndexRecord record = allValues.get(i);
// check content in order
String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">";
assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN());
assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN());
assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN());
}
}
@DataProvider
public Object[][] precedingCSNDataProvider()
{
final int serverId = 42;
final int t = 1000;
return new Object[][] {
// @formatter:off
{ null, null, },
{ new CSN(t, 1, serverId), new CSN(t, 0, serverId), },
{ new CSN(t, 0, serverId), new CSN(t - 1, Integer.MAX_VALUE, serverId), },
// @formatter:on
};
}
}