| | |
| | | import java.util.Map; |
| | | |
| | | import org.mockito.ArgumentCaptor; |
| | | import org.mockito.Mock; |
| | | import org.mockito.MockitoAnnotations; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | |
| | | import static org.mockito.Matchers.*; |
| | | import static org.mockito.Mockito.*; |
| | | |
| | | /** |
| | | * Test for ChangeNumberIndexer class. All dependencies to the changelog DB |
| | | * interfaces are mocked. The ChangeNumberIndexer class simulates what the RS |
| | | * does to compute a changeNumber. The tests setup various topologies with their |
| | | * replicas. |
| | | * <p> |
| | | * All tests are written with this layout: |
| | | * <ul> |
| | | * <li>Initial setup where RS is stopped. Data are set into the changelog state |
| | | * DB, the replica DBs and the change number index DB.</li> |
| | | * <li>Simulate RS startup by calling {@link #startCNIndexer(DN...)}. This will |
| | | * start the change number indexer thread that will start computing change |
| | | * numbers and inserting them in the change number index db.</li> |
| | | * <li>Send events to the change number indexer thread by publishing update |
| | | * messages, sending heartbeat messages or replica offline messages.</li> |
| | | * </ul> |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class ChangeNumberIndexerTest extends DirectoryServerTestCase |
| | | { |
| | |
| | | private static final int serverId2 = 102; |
| | | private static final int serverId3 = 103; |
| | | |
| | | @Mock |
| | | private ChangelogDB changelogDB; |
| | | @Mock |
| | | private ChangeNumberIndexDB cnIndexDB; |
| | | @Mock |
| | | private ReplicationDomainDB domainDB; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors = |
| | | new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors; |
| | | private ChangelogState initialState; |
| | | private Map<DN, ServerState> domainNewestCSNs; |
| | | private ChangeNumberIndexer cnIndexer; |
| | | private MultiDomainServerState initialCookie; |
| | | |
| | |
| | | @BeforeMethod |
| | | public void setup() throws Exception |
| | | { |
| | | changelogDB = mock(ChangelogDB.class); |
| | | cnIndexDB = mock(ChangeNumberIndexDB.class); |
| | | domainDB = mock(ReplicationDomainDB.class); |
| | | MockitoAnnotations.initMocks(this); |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | domainNewestCSNs = new HashMap<DN, ServerState>(); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | |
| | | { |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | setDBInitialRecords(msg1); |
| | | setCNIndexDBInitialRecords(msg1); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | | publishUpdateMsg(msg2); |
| | |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // simulate messages received out of order |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN2, serverId2); |
| | | startCNIndexer(BASE_DN1, BASE_DN2); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2); |
| | |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | setDBInitialRecords(msg1, msg2); |
| | | setCNIndexDBInitialRecords(msg1, msg2); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); |
| | |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1); |
| | | final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2); |
| | |
| | | addReplica(BASE_DN1, serverId2); |
| | | addReplica(BASE_DN1, serverId3); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // cn=admin data will does not participate in the external changelog |
| | | // so it cannot add to it |
| | |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | |
| | | assertExternalChangelogContent(msg1, msg2, msg4, msg5); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyOffline() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | publishUpdateMsg(msg2); |
| | | // MCP does not wait for temporarily offline serverId1 |
| | | assertExternalChangelogContent(msg2); |
| | | |
| | | // serverId1 is back online, wait for changes from serverId2 |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); |
| | | publishUpdateMsg(msg3); |
| | | assertExternalChangelogContent(msg2); |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); |
| | | publishUpdateMsg(msg4); |
| | | // MCP moves forward |
| | | assertExternalChangelogContent(msg2, msg3); |
| | | } |
| | | |
| | | /** |
| | | * Scenario: |
| | | * <ol> |
| | | * <li>Replica 1 publishes one change</li> |
| | | * <li>Replica 1 sends offline message</li> |
| | | * <li>RS stops</li> |
| | | * <li>RS starts</li> |
| | | * </ol> |
| | | */ |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | |
| | | // blocked until we receive info for serverId2 |
| | | assertExternalChangelogContent(); |
| | | |
| | | sendHeartbeat(BASE_DN1, serverId2, 3); |
| | | // MCP moves forward |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | // do not wait for temporarily offline serverId1 |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); |
| | | publishUpdateMsg(msg3); |
| | | assertExternalChangelogContent(msg1, msg3); |
| | | |
| | | // serverId1 is back online, wait for changes from serverId2 |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); |
| | | publishUpdateMsg(msg4); |
| | | assertExternalChangelogContent(msg1, msg3); |
| | | |
| | | final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5); |
| | | publishUpdateMsg(msg5); |
| | | // MCP moves forward |
| | | assertExternalChangelogContent(msg1, msg3, msg4); |
| | | } |
| | | |
| | | /** |
| | | * Scenario: |
| | | * <ol> |
| | | * <li>Replica 1 sends offline message</li> |
| | | * <li>Replica 1 starts</li> |
| | | * <li>Replica 1 publishes one change</li> |
| | | * <li>Replica 1 publishes a second change</li> |
| | | * <li>RS stops</li> |
| | | * <li>RS starts</li> |
| | | * </ol> |
| | | */ |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); |
| | | publishUpdateMsg(msg2, msg3); |
| | | startCNIndexer(BASE_DN1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // MCP moves forward because serverId1 is not really offline |
| | | // since because we received a message from it after the offline replica msg |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); |
| | | publishUpdateMsg(msg4); |
| | | assertExternalChangelogContent(msg2, msg3); |
| | | |
| | | // back to normal operations |
| | | sendHeartbeat(BASE_DN1, serverId1, 4); |
| | | assertExternalChangelogContent(msg2, msg3, msg4); |
| | | } |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | | cursors.put(Pair.of(baseDN, serverId), cursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(cursor); |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState()); |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( |
| | | getDomainNewestCSNs(baseDN)); |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | |
| | | private ServerState getDomainNewestCSNs(final DN baseDN) |
| | | { |
| | | ServerState serverState = domainNewestCSNs.get(baseDN); |
| | | if (serverState == null) |
| | | { |
| | | serverState = new ServerState(); |
| | | domainNewestCSNs.put(baseDN, serverState); |
| | | } |
| | | return serverState; |
| | | } |
| | | |
| | | private void startCNIndexer(DN... eclEnabledDomains) |
| | | { |
| | | final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains); |
| | |
| | | if (cnIndexer != null) |
| | | { |
| | | cnIndexer.initiateShutdown(); |
| | | cnIndexer.interrupt(); |
| | | cnIndexer.join(); |
| | | cnIndexer = null; |
| | | } |
| | | } |
| | | |
| | |
| | | return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true); |
| | | } |
| | | |
| | | private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception |
| | | private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception |
| | | { |
| | | // Initialize the previous cookie that will be used to compare the records |
| | | // added to the CNIndexDB at the end of this test |
| | |
| | | { |
| | | if (!msg.isEmptyCursor()) |
| | | { |
| | | cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg); |
| | | if (cnIndexer != null) |
| | | { |
| | | // indexer is running |
| | | cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg); |
| | | } |
| | | else |
| | | { |
| | | // we are only setting up initial state, update the domain newest CSNs |
| | | getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN()); |
| | | } |
| | | } |
| | | } |
| | | waitForWaitingState(cnIndexer); |
| | |
| | | |
| | | private void waitForWaitingState(final Thread t) |
| | | { |
| | | if (t == null) |
| | | { // not started yet, do not wait |
| | | return; |
| | | } |
| | | State state = t.getState(); |
| | | while (!state.equals(State.WAITING) |
| | | && !state.equals(State.TIMED_WAITING) |
| | |
| | | @Test(dataProvider = "precedingCSNDataProvider") |
| | | public void getPrecedingCSN(CSN start, CSN expected) |
| | | { |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState); |
| | | CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); |
| | | assertThat(precedingCSN).isEqualTo(expected); |
| | | } |