| | |
| | | } |
| | | |
| | | private static DN BASE_DN; |
| | | private static DN ADMIN_DATA_DN; |
| | | private static final int serverId1 = 101; |
| | | private static final int serverId2 = 102; |
| | | private static final int serverId3 = 103; |
| | | |
| | | private ChangelogDB changelogDB; |
| | | private ChangeNumberIndexDB cnIndexDB; |
| | |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors = |
| | | new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | private ChangelogState initialState; |
| | | private ChangeNumberIndexer indexer; |
| | | private ChangeNumberIndexer cnIndexer; |
| | | private MultiDomainServerState initialCookie; |
| | | |
| | | @BeforeClass |
| | |
| | | { |
| | | TestCaseUtils.startFakeServer(); |
| | | BASE_DN = DN.decode("dc=example,dc=com"); |
| | | ADMIN_DATA_DN = DN.decode("cn=admin data"); |
| | | } |
| | | |
| | | @AfterClass |
| | |
| | | @AfterMethod |
| | | public void tearDown() throws Exception |
| | | { |
| | | stopIndexer(); |
| | | stopCNIndexer(); |
| | | } |
| | | |
| | | private static final String EMPTY_DB_NO_DS = "emptyDBNoDS"; |
| | |
| | | @Test |
| | | public void emptyDBNoDS() throws Exception |
| | | { |
| | | startIndexer(); |
| | | verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class)); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDS() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | assertAddedRecords(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | addReplica(BASE_DN, serverId1); |
| | | setDBInitialRecords(msg1); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2); |
| | | publishUpdateMsg(msg2); |
| | | assertAddedRecords(msg2); |
| | | assertExternalChangelogContent(msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg2, msg1); |
| | | assertAddedRecords(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | setDBInitialRecords(msg1, msg2); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3); |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4); |
| | | publishUpdateMsg(msg3, msg4); |
| | | assertAddedRecords(msg3); |
| | | assertExternalChangelogContent(msg3); |
| | | |
| | | final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5); |
| | | publishUpdateMsg(msg5); |
| | | assertAddedRecords(msg3); |
| | | assertExternalChangelogContent(msg3); |
| | | |
| | | final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6); |
| | | publishUpdateMsg(msg6); |
| | | assertAddedRecords(msg3, msg4, msg5); |
| | | assertExternalChangelogContent(msg3, msg4, msg5); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1); |
| | | final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2); |
| | |
| | | final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3); |
| | | // simulate no messages received during some time for replica 2 |
| | | publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1); |
| | | assertAddedRecords(msg1Sid2, msg2Sid1); |
| | | assertExternalChangelogContent(msg1Sid2, msg2Sid1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception |
| | | { |
| | | addReplica(ADMIN_DATA_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | addReplica(BASE_DN, serverId3); |
| | | startCNIndexer(); |
| | | |
| | | // cn=admin data will does not participate in the external changelog |
| | | // so it cannot add to it |
| | | final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3); |
| | | publishUpdateMsg(msg2, msg3); |
| | | assertExternalChangelogContent(msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | |
| | | addReplica(BASE_DN, serverId2); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg2); |
| | | assertAddedRecords(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3); |
| | | publishUpdateMsg(msg3); |
| | | assertAddedRecords(msg1, msg2); |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg1, msg2); |
| | | assertAddedRecords(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | sendHeartbeat(BASE_DN, serverId1, 3); |
| | | assertAddedRecords(msg1, msg2); |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | startCNIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg1, msg2); |
| | | assertAddedRecords(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | replicaOffline(BASE_DN, serverId2, 3); |
| | | // MCP cannot move forward since no new updates from serverId1 |
| | | assertAddedRecords(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4); |
| | | publishUpdateMsg(msg4); |
| | | // MCP moved forward after receiving update from serverId1 |
| | | // (last replica in the domain) |
| | | assertAddedRecords(msg1, msg2, msg4); |
| | | assertExternalChangelogContent(msg1, msg2, msg4); |
| | | } |
| | | |
| | | |
| | |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | |
| | | private void startIndexer() |
| | | private void startCNIndexer() |
| | | { |
| | | indexer = new ChangeNumberIndexer(changelogDB, initialState); |
| | | indexer.start(); |
| | | waitForWaitingState(indexer); |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState) |
| | | { |
| | | @Override |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | | return BASE_DN.equals(baseDN); |
| | | } |
| | | }; |
| | | cnIndexer.start(); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private void stopIndexer() |
| | | private void stopCNIndexer() |
| | | { |
| | | indexer.initiateShutdown(); |
| | | cnIndexer.initiateShutdown(); |
| | | } |
| | | |
| | | private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time) |
| | |
| | | { |
| | | if (!msg.isEmptyCursor()) |
| | | { |
| | | indexer.publishUpdateMsg(msg.getBaseDN(), msg); |
| | | cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg); |
| | | } |
| | | } |
| | | waitForWaitingState(indexer); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception |
| | | { |
| | | indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId)); |
| | | waitForWaitingState(indexer); |
| | | cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId)); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private void replicaOffline(DN baseDN, int serverId, int time) throws Exception |
| | | { |
| | | indexer.replicaOffline(baseDN, new CSN(time, 0, serverId)); |
| | | waitForWaitingState(indexer); |
| | | cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId)); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private void waitForWaitingState(final Thread t) |
| | |
| | | * Asserts which records have been added to the CNIndexDB since starting the |
| | | * {@link ChangeNumberIndexer} thread. |
| | | */ |
| | | private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception |
| | | private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs) |
| | | throws Exception |
| | | { |
| | | if (msgs.length == 0) |
| | | { |
| | | verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class)); |
| | | return; |
| | | } |
| | | |
| | | final ArgumentCaptor<ChangeNumberIndexRecord> arg = |
| | | ArgumentCaptor.forClass(ChangeNumberIndexRecord.class); |
| | | verify(cnIndexDB, atLeast(0)).addRecord(arg.capture()); |
| | |
| | | final ChangeNumberIndexRecord record = allValues.get(i); |
| | | if (previousCookie.isEmpty()) |
| | | { |
| | | // ugly hack to go round strange legacy code |
| | | // ugly hack to go round strange legacy code @see OPENDJ-67 |
| | | previousCookie.replace(record.getBaseDN(), new ServerState()); |
| | | } |
| | | // check content in order |