| | |
| | | } |
| | | |
| | | /** |
| | | * Removes the mapping to the provided CSN if it is present in this |
| | | * MultiDomainServerState. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | * @param expectedCSN |
| | | * the CSN to be removed |
| | | * @return true if the CSN could be removed, false otherwise. |
| | | */ |
| | | public boolean removeCSN(DN baseDN, CSN expectedCSN) |
| | | { |
| | | ServerState ss = list.get(baseDN); |
| | | if (ss != null) |
| | | { |
| | | return ss.removeCSN(expectedCSN); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Test if this object equals the provided other object. |
| | | * @param other The other object with which we want to test equality. |
| | | * @return Returns True if this equals other, else return false. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Removes the mapping to the provided CSN if it is present in this |
| | | * ServerState. |
| | | * |
| | | * @param expectedCSN |
| | | * the CSN to be removed |
| | | * @return true if the CSN could be removed, false otherwise. |
| | | */ |
| | | public boolean removeCSN(CSN expectedCSN) |
| | | { |
| | | if (expectedCSN == null) |
| | | return false; |
| | | |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (Iterator<CSN> iter = serverIdToCSN.values().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final CSN csn = iter.next(); |
| | | if (expectedCSN.equals(csn)) |
| | | { |
| | | iter.remove(); |
| | | saved = false; |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Replace the Server State with another ServerState. |
| | | * |
| | | * @param serverState The ServerState. |
| | |
| | | /* |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. This solution also avoids |
| | | * using a queue that could fill up before we have consumed all its content. |
| | | * updates 2) many updates can happen concurrently. |
| | | */ |
| | | /** |
| | | * Holds the cross domain medium consistency Replication Update Vector for the |
| | |
| | | */ |
| | | private final MultiDomainServerState lastSeenUpdates = |
| | | new MultiDomainServerState(); |
| | | private final MultiDomainServerState replicasOffline = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Composite cursor across all the replicaDBs for all the replication domains. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Signals a replica went offline. |
| | | * |
| | | * @param baseDN |
| | | * the replica's replication domain |
| | | * @param offlineCSN |
| | | * the serverId and time of the replica that went offline |
| | | */ |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | { |
| | | lastSeenUpdates.update(baseDN, offlineCSN); |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | tryNotify(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Notifies the Change number indexer thread if it will be able to do some |
| | | * work. |
| | | */ |
| | |
| | | final CSN mcCSN = mediumConsistencyCSN; |
| | | if (mcCSN != null) |
| | | { |
| | | final CSN lastSeenSameServerId = |
| | | lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId()); |
| | | final int serverId = mcCSN.getServerId(); |
| | | final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId); |
| | | return mcCSN.isOlderThan(lastSeenSameServerId); |
| | | } |
| | | return true; |
| | |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(baseDN, csn); |
| | | mediumConsistencyCSN = csn; |
| | | final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId()); |
| | | if (offlineCSN != null |
| | | && offlineCSN.isOlderThan(mediumConsistencyCSN) |
| | | // If no new updates has been seen for this replica |
| | | && lastSeenUpdates.removeCSN(baseDN, offlineCSN)) |
| | | { |
| | | removeCursor(baseDN, csn); |
| | | replicasOffline.removeCSN(baseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | private void removeCursor(final DN baseDN, final CSN csn) |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors |
| | | .entrySet()) |
| | | { |
| | | if (baseDN.equals(entry.getKey())) |
| | | { |
| | | final Set<Integer> serverIds = entry.getValue().keySet(); |
| | | for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();) |
| | | { |
| | | final int serverId = iter.next(); |
| | | if (csn.getServerId() == serverId) |
| | | { |
| | | iter.remove(); |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void createNewCursors() throws ChangelogException |
| | |
| | | assertEquals(state.toString(), expected); |
| | | } |
| | | |
| | | @Test |
| | | public void testUpdateMultiDomainServerState() throws Exception |
| | | { |
| | | final DN dn1 = DN.decode("o=test1"); |
| | | final DN dn2 = DN.decode("o=test2"); |
| | | |
| | | final MultiDomainServerState state1 = new MultiDomainServerState(); |
| | | state1.update(dn1, csn3); |
| | | state1.update(dn2, csn2); |
| | | final MultiDomainServerState state2 = new MultiDomainServerState(); |
| | | state2.update(state1); |
| | | |
| | | assertSame(csn3, state2.getCSN(dn1, csn3.getServerId())); |
| | | assertSame(csn2, state2.getCSN(dn2, csn2.getServerId())); |
| | | assertTrue(state1.equalsTo(state2)); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { "testUpdateCSN" }) |
| | | public void testEqualsTo() throws Exception |
| | | { |
| | |
| | | assertTrue(state.isEmpty()); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { "testUpdateCSN" }) |
| | | public void testRemoveCSN() throws Exception |
| | | { |
| | | final DN dn1 = DN.decode("o=test1"); |
| | | final DN dn2 = DN.decode("o=test2"); |
| | | final DN dn3 = DN.decode("o=test3"); |
| | | |
| | | final MultiDomainServerState state = new MultiDomainServerState(); |
| | | |
| | | assertTrue(state.update(dn1, csn1)); |
| | | assertTrue(state.update(dn2, csn1)); |
| | | assertTrue(state.update(dn2, csn2)); |
| | | assertNull(state.getCSN(dn3, 42)); |
| | | |
| | | assertFalse(state.removeCSN(dn3, csn1)); |
| | | assertSame(csn1, state.getCSN(dn1, csn1.getServerId())); |
| | | assertSame(csn1, state.getCSN(dn2, csn1.getServerId())); |
| | | assertSame(csn2, state.getCSN(dn2, csn2.getServerId())); |
| | | |
| | | assertFalse(state.removeCSN(dn1, csn2)); |
| | | assertSame(csn1, state.getCSN(dn1, csn1.getServerId())); |
| | | assertSame(csn1, state.getCSN(dn2, csn1.getServerId())); |
| | | assertSame(csn2, state.getCSN(dn2, csn2.getServerId())); |
| | | |
| | | assertTrue(state.removeCSN(dn2, csn1)); |
| | | assertSame(csn1, state.getCSN(dn1, csn1.getServerId())); |
| | | assertNull(state.getCSN(dn2, csn1.getServerId())); |
| | | assertSame(csn2, state.getCSN(dn2, csn2.getServerId())); |
| | | } |
| | | } |
| | |
| | | assertTrue(state.cover(csn1Server2)); |
| | | assertFalse(state.cover(csn0Server3)); |
| | | } |
| | | |
| | | @Test |
| | | public void testRemoveCSN() throws Exception |
| | | { |
| | | final CSN csn1Server1 = new CSN(1, 0, 1); |
| | | final CSN csn2Server1 = new CSN(2, 0, 1); |
| | | final CSN csn1Server2 = new CSN(1, 0, 2); |
| | | |
| | | final ServerState state = new ServerState(); |
| | | assertTrue(state.update(csn1Server1)); |
| | | |
| | | assertFalse(state.removeCSN(null)); |
| | | assertFalse(state.removeCSN(csn2Server1)); |
| | | assertFalse(state.removeCSN(csn1Server2)); |
| | | assertTrue(state.removeCSN(csn1Server1)); |
| | | } |
| | | } |
| | |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | |
| | | assertAddedRecords(msg1); |
| | | } |
| | | |
| | |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2); |
| | | publishUpdateMsg(msg2); |
| | | |
| | | assertAddedRecords(msg2); |
| | | } |
| | | |
| | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg2, msg1); |
| | | |
| | | assertAddedRecords(msg1); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception |
| | | public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | |
| | | final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3); |
| | | // simulate no messages received during some time for replica 2 |
| | | publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1); |
| | | |
| | | assertAddedRecords(msg1Sid2, msg2Sid1); |
| | | } |
| | | |
| | |
| | | assertAddedRecords(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg1, msg2); |
| | | assertAddedRecords(msg1); |
| | | |
| | | sendHeartbeat(BASE_DN, serverId1, 3); |
| | | assertAddedRecords(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg1, msg2); |
| | | assertAddedRecords(msg1); |
| | | |
| | | replicaOffline(BASE_DN, serverId2, 3); |
| | | // MCP cannot move forward since no new updates from serverId1 |
| | | assertAddedRecords(msg1); |
| | | |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4); |
| | | publishUpdateMsg(msg4); |
| | | // MCP moved forward after receiving update from serverId1 |
| | | // (last replica in the domain) |
| | | assertAddedRecords(msg1, msg2, msg4); |
| | | } |
| | | |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | |
| | | waitForWaitingState(indexer); |
| | | } |
| | | |
| | | private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception |
| | | { |
| | | indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId)); |
| | | waitForWaitingState(indexer); |
| | | } |
| | | |
| | | private void replicaOffline(DN baseDN, int serverId, int time) throws Exception |
| | | { |
| | | indexer.replicaOffline(baseDN, new CSN(time, 0, serverId)); |
| | | waitForWaitingState(indexer); |
| | | } |
| | | |
| | | private void waitForWaitingState(final Thread t) |
| | | { |
| | | State state = t.getState(); |