/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.lang.Thread.State; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.DN; import org.testng.annotations.*; import com.forgerock.opendj.util.Pair; import static org.assertj.core.api.Assertions.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; /** * Test for ChangeNumberIndexer class. All dependencies to the changelog DB * interfaces are mocked. The ChangeNumberIndexer class simulates what the RS * does to compute a changeNumber. The tests setup various topologies with their * replicas. *

* All tests are written with this layout: *

*/ @SuppressWarnings("javadoc") public class ChangeNumberIndexerTest extends DirectoryServerTestCase { private static final class ReplicatedUpdateMsg extends UpdateMsg { private final DN baseDN; private final boolean emptyCursor; public ReplicatedUpdateMsg(DN baseDN, CSN csn) { this(baseDN, csn, false); } public ReplicatedUpdateMsg(DN baseDN, CSN csn, boolean emptyCursor) { super(csn, null); this.baseDN = baseDN; this.emptyCursor = emptyCursor; } public DN getBaseDN() { return baseDN; } public boolean isEmptyCursor() { return emptyCursor; } /** {@inheritDoc} */ @Override public String toString() { return "UpdateMsg(" + "\"" + baseDN + " " + getCSN().getServerId() + "\"" + ", csn=" + getCSN().toStringUI() + ")"; } } private static DN BASE_DN1; private static DN BASE_DN2; private static DN ADMIN_DATA_DN; private static final int serverId1 = 101; private static final int serverId2 = 102; private static final int serverId3 = 103; @Mock private ChangelogDB changelogDB; @Mock private ChangeNumberIndexDB cnIndexDB; @Mock private ReplicationDomainDB domainDB; private List eclEnabledDomains; private MultiDomainDBCursor multiDomainCursor; private Map, SequentialDBCursor> replicaDBCursors; private Map domainDBCursors; private ChangelogState initialState; private Map domainNewestCSNs; private ChangeNumberIndexer cnIndexer; private MultiDomainServerState initialCookie; @BeforeClass public static void classSetup() throws Exception { TestCaseUtils.startFakeServer(); BASE_DN1 = DN.decode("dc=example,dc=com"); BASE_DN2 = DN.decode("dc=world,dc=company"); ADMIN_DATA_DN = DN.decode("cn=admin data"); } @AfterClass public static void classTearDown() throws Exception { TestCaseUtils.shutdownFakeServer(); } @BeforeMethod public void setup() throws Exception { MockitoAnnotations.initMocks(this); multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY); initialState = new ChangelogState(); initialCookie = new MultiDomainServerState(); replicaDBCursors = new HashMap, SequentialDBCursor>(); domainDBCursors = new HashMap(); domainNewestCSNs = new HashMap(); when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY))) .thenReturn(multiDomainCursor); } @AfterMethod public void tearDown() throws Exception { stopCNIndexer(); } private static final String EMPTY_DB_NO_DS = "emptyDBNoDS"; @Test public void emptyDBNoDS() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); startCNIndexer(); assertExternalChangelogContent(); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBOneDS() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); assertExternalChangelogContent(msg1); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void nonEmptyDBOneDS() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); addReplica(BASE_DN1, serverId1); setCNIndexDBInitialRecords(msg1); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); publishUpdateMsg(msg2); assertExternalChangelogContent(msg2); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSs() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); startCNIndexer(); assertExternalChangelogContent(); // simulate messages received out of order final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); // do not start publishing to the changelog until we hear from serverId1 assertExternalChangelogContent(); publishUpdateMsg(msg1); assertExternalChangelogContent(msg1); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsDifferentDomains() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN2, serverId2); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2); publishUpdateMsg(msg1, msg2); assertExternalChangelogContent(msg1); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); assertExternalChangelogContent(msg1, msg2); } /** * This test tries to reproduce a very subtle implementation bug where: *
    *
  1. the change number indexer has no more records to proceed, because all * cursors are exhausted, so it calls wait()
  2. *
  3. a new change Upd1 comes in for an exhausted cursor, * medium consistency cannot move
  4. *
  5. a new change Upd2 comes in for a cursor that is not already opened, * medium consistency can move, so wake up the change number indexer
  6. *
  7. on wake up, the change number indexer calls next(), * advancing the CompositeDBCursor, which recycles the exhausted cursor, * then calls next() on it, making it lose its change. * CompositeDBCursor currentRecord == Upd1.
  8. *
  9. on the next iteration of the loop in run(), a new cursor is created, * triggering the creation of a new CompositeDBCursor => Upd1 is lost. * CompositeDBCursor currentRecord == Upd2.
  10. *
*/ @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); assertExternalChangelogContent(msg1); addReplica(BASE_DN1, serverId2); sendHeartbeat(BASE_DN1, serverId2, 2); assertExternalChangelogContent(msg1); // publish change that will not trigger a wake up of change number indexer, // but will make it open a cursor on next wake up final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); assertExternalChangelogContent(msg1); // wake up change number indexer final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); assertExternalChangelogContent(msg1, msg2); sendHeartbeat(BASE_DN1, serverId2, 4); // assert no changes have been lost assertExternalChangelogContent(msg1, msg2, msg3); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void nonEmptyDBTwoDSs() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); setCNIndexDBInitialRecords(msg1, msg2); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); publishUpdateMsg(msg3, msg4); assertExternalChangelogContent(msg3); final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5); publishUpdateMsg(msg5); assertExternalChangelogContent(msg3); final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6); publishUpdateMsg(msg6); assertExternalChangelogContent(msg3, msg4, msg5); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1); final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2); final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN1, serverId1, 2); final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN1, serverId2, 3); // simulate no messages received during some time for replica 2 publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1); assertExternalChangelogContent(msg1Sid2, msg2Sid1); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(ADMIN_DATA_DN, serverId1); addReplica(BASE_DN1, serverId2); addReplica(BASE_DN1, serverId3); startCNIndexer(); assertExternalChangelogContent(); // cn=admin data will does not participate in the external changelog // so it cannot add to it final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1); publishUpdateMsg(msg1); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId3, 3); publishUpdateMsg(msg2, msg3); assertExternalChangelogContent(msg2); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBOneInitialDSAnotherDSJoining() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); assertExternalChangelogContent(msg1); addReplica(BASE_DN1, serverId2); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); assertExternalChangelogContent(msg1); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); assertExternalChangelogContent(msg1, msg2); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); addReplica(BASE_DN1, serverId2); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); assertExternalChangelogContent(msg1); sendHeartbeat(BASE_DN1, serverId1, 3); assertExternalChangelogContent(msg1, msg2); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg1, msg2); assertExternalChangelogContent(msg1); sendHeartbeat(BASE_DN1, serverId1, 3); assertExternalChangelogContent(msg1, msg2); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneGoingOffline() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg1, msg2); assertExternalChangelogContent(msg1); replicaOffline(BASE_DN1, serverId2, 3); // MCP cannot move forward since no new updates from serverId1 assertExternalChangelogContent(msg1); final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); publishUpdateMsg(msg4); // MCP moves forward after receiving update from serverId1 // (last replica in the domain) assertExternalChangelogContent(msg1, msg2, msg4); // serverId2 comes online again final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5); publishUpdateMsg(msg5); // MCP does not move until it knows what happens to serverId1 assertExternalChangelogContent(msg1, msg2, msg4); sendHeartbeat(BASE_DN1, serverId1, 6); // MCP moves forward assertExternalChangelogContent(msg1, msg2, msg4, msg5); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneInitiallyOffline() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); // MCP does not wait for temporarily offline serverId1 assertExternalChangelogContent(msg2); // serverId1 is back online, wait for changes from serverId2 final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); assertExternalChangelogContent(msg2); final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); publishUpdateMsg(msg4); // MCP moves forward assertExternalChangelogContent(msg2, msg3); } /** * Scenario: *
    *
  1. Replica 1 publishes one change
  2. *
  3. Replica 1 sends offline message
  4. *
  5. RS stops
  6. *
  7. RS starts
  8. *
*/ @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1)); startCNIndexer(); // blocked until we receive info for serverId2 assertExternalChangelogContent(); sendHeartbeat(BASE_DN1, serverId2, 3); // MCP moves forward assertExternalChangelogContent(msg1); // do not wait for temporarily offline serverId1 final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); publishUpdateMsg(msg4); assertExternalChangelogContent(msg1, msg4); // serverId1 is back online, wait for changes from serverId2 final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5); publishUpdateMsg(msg5); assertExternalChangelogContent(msg1, msg4); final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6); publishUpdateMsg(msg6); // MCP moves forward assertExternalChangelogContent(msg1, msg4, msg5); } /** * Scenario: *
    *
  1. Replica 1 sends offline message
  2. *
  3. Replica 1 starts
  4. *
  5. Replica 1 publishes one change
  6. *
  7. Replica 1 publishes a second change
  8. *
  9. RS stops
  10. *
  11. RS starts
  12. *
*/ @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg2, msg3); startCNIndexer(); assertExternalChangelogContent(); // MCP moves forward because serverId1 is not really offline // since we received a message from it newer than the offline replica msg final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); publishUpdateMsg(msg4); assertExternalChangelogContent(msg2, msg3); // back to normal operations sendHeartbeat(BASE_DN1, serverId1, 5); assertExternalChangelogContent(msg2, msg3, msg4); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneKilled() throws Exception { eclEnabledDomains = Arrays.asList(BASE_DN1); addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); startCNIndexer(); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); // MCP cannot move forward: no news yet from serverId2 assertExternalChangelogContent(); sendHeartbeat(BASE_DN1, serverId2, 2); // MCP moves forward: we know what serverId2 is at assertExternalChangelogContent(msg1); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); // MCP cannot move forward: serverId2 is the oldest CSN assertExternalChangelogContent(msg1); } private void addReplica(DN baseDN, int serverId) throws Exception { final SequentialDBCursor replicaDBCursor = new SequentialDBCursor(); replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor); if (isECLEnabledDomain2(baseDN)) { DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); if (domainDBCursor == null) { domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY); domainDBCursors.put(baseDN, domainDBCursor); multiDomainCursor.addDomain(baseDN, null); when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY))) .thenReturn(domainDBCursor); } domainDBCursor.addReplicaDB(serverId, null); when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY))) .thenReturn(replicaDBCursor); } when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( getDomainNewestCSNs(baseDN)); initialState.addServerIdToDomain(serverId, baseDN); } private ServerState getDomainNewestCSNs(final DN baseDN) { ServerState serverState = domainNewestCSNs.get(baseDN); if (serverState == null) { serverState = new ServerState(); domainNewestCSNs.put(baseDN, serverState); } return serverState; } private void startCNIndexer() { cnIndexer = new ChangeNumberIndexer(changelogDB, initialState) { @Override protected boolean isECLEnabledDomain(DN baseDN) { return isECLEnabledDomain2(baseDN); } }; cnIndexer.start(); waitForWaitingState(cnIndexer); } private boolean isECLEnabledDomain2(DN baseDN) { return eclEnabledDomains.contains(baseDN); } private void stopCNIndexer() throws Exception { if (cnIndexer != null) { cnIndexer.initiateShutdown(); cnIndexer.join(); cnIndexer = null; } } private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time) { return new ReplicatedUpdateMsg(baseDN, new CSN(time, 0, serverId)); } private ReplicatedUpdateMsg emptyCursor(DN baseDN, int serverId) { return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true); } private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception { // Initialize the previous cookie that will be used to compare the records // added to the CNIndexDB at the end of this test for (int i = 0; i < msgs.length; i++) { ReplicatedUpdateMsg msg = msgs[i]; if (i + 1 == msgs.length) { final ReplicatedUpdateMsg newestMsg = msg; final DN baseDN = newestMsg.getBaseDN(); final CSN csn = newestMsg.getCSN(); when(cnIndexDB.getNewestRecord()).thenReturn( new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn)); final SequentialDBCursor cursor = replicaDBCursors.get(Pair.of(baseDN, csn.getServerId())); cursor.add(newestMsg); } initialCookie.update(msg.getBaseDN(), msg.getCSN()); } } private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception { for (ReplicatedUpdateMsg msg : msgs) { final SequentialDBCursor cursor = replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); if (msg.isEmptyCursor()) { cursor.add(null); } else { cursor.add(msg); } } for (ReplicatedUpdateMsg msg : msgs) { if (!msg.isEmptyCursor()) { if (cnIndexer != null) { // indexer is running cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg); } else { // we are only setting up initial state, update the domain newest CSNs getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN()); } } } waitForWaitingState(cnIndexer); } private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception { cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId)); waitForWaitingState(cnIndexer); } private void replicaOffline(DN baseDN, int serverId, int time) throws Exception { cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId)); waitForWaitingState(cnIndexer); } private void waitForWaitingState(final Thread t) { if (t == null) { // not started yet, do not wait return; } State state = t.getState(); while (!state.equals(State.WAITING) && !state.equals(State.TIMED_WAITING) && !state.equals(State.TERMINATED)) { Thread.yield(); state = t.getState(); } assertThat(state).isIn(State.WAITING, State.TIMED_WAITING); } /** * Asserts which records have been added to the CNIndexDB since starting the * {@link ChangeNumberIndexer} thread. */ private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs) throws Exception { final ArgumentCaptor arg = ArgumentCaptor.forClass(ChangeNumberIndexRecord.class); verify(cnIndexDB, atLeast(0)).addRecord(arg.capture()); final List allValues = arg.getAllValues(); // clone initial state to avoid modifying it final MultiDomainServerState previousCookie = new MultiDomainServerState(initialCookie.toString()); // check it was not called more than expected String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">"; assertThat(allValues).as(desc1).hasSize(expectedMsgs.length); for (int i = 0; i < expectedMsgs.length; i++) { final ReplicatedUpdateMsg expectedMsg = expectedMsgs[i]; final ChangeNumberIndexRecord record = allValues.get(i); // check content in order String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">"; assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN()); assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN()); assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString()); previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN()); } } @DataProvider public Object[][] precedingCSNDataProvider() { final int serverId = 42; final int t = 1000; return new Object[][] { // @formatter:off { null, null, }, { new CSN(t, 1, serverId), new CSN(t, 0, serverId), }, { new CSN(t, 0, serverId), new CSN(t - 1, Integer.MAX_VALUE, serverId), }, // @formatter:on }; } }