opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,19 @@ } /** * Returns the ServerState associated to the provided replication domain's * baseDN. * * @param baseDN * the replication domain's baseDN * @return the associated ServerState */ public ServerState getServerState(DN baseDN) { return list.get(baseDN); } /** * Returns the CSN associated to the provided replication domain's baseDN and * serverId. * @@ -216,7 +229,7 @@ * the replication domain's baseDN * @param serverId * the serverId * @return the associated ServerState * @return the associated CSN */ public CSN getCSN(DN baseDN, int serverId) { opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -167,8 +167,6 @@ */ private int assuredTimeoutTimerPurgeCounter = 0; private ServerState ctHeartbeatState; /** * Creates a new ReplicationServerDomain associated to the baseDN. * @@ -2558,19 +2556,6 @@ } /** * Return the state that contain for each server the time of eligibility. * @return the state. */ public ServerState getChangeTimeHeartbeatState() { if (ctHeartbeatState == null) { ctHeartbeatState = getLatestServerState().duplicate(); } return ctHeartbeatState; } /** * Returns the oldest known state for the domain, made of the oldest CSN * stored for each serverId. * <p> @@ -2593,31 +2578,13 @@ * * @return the eligible CSN. */ public CSN getEligibleCSN() CSN getEligibleCSN() { CSN eligibleCSN = null; final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN); for (final CSN replicaNewestCSN : newestCSNs) for (final CSN lastAliveCSN : domainDB.getDomainLastAliveCSNs(baseDN)) { // Should it be considered for eligibility ? int serverId = replicaNewestCSN.getServerId(); CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId); // If the most recent UpdateMsg or CLHeartbeatMsg received is very old // then the domain is considered down and not considered for eligibility /* if ((heartbeatLastDN != null) && (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000)) { if (debugEnabled()) TRACER.debugInfo("In " + this.getName() + " Server " + sid + " is not considered for eligibility ... potentially down"); continue; } */ final int serverId = lastAliveCSN.getServerId(); if (!isServerConnected(serverId)) { if (debugEnabled()) @@ -2628,13 +2595,9 @@ continue; } if (eligibleCSN == null || replicaNewestCSN.isNewerThan(eligibleCSN)) if (eligibleCSN == null || lastAliveCSN.isNewerThan(eligibleCSN)) { eligibleCSN = replicaNewestCSN; } if (heartbeatLastCSN != null && heartbeatLastCSN.isNewerThan(eligibleCSN)) { eligibleCSN = heartbeatLastCSN; eligibleCSN = lastAliveCSN; } } @@ -2689,7 +2652,7 @@ try { storeReceivedCTHeartbeat(msg.getCSN()); domainDB.replicaHeartbeat(baseDN, msg.getCSN()); if (senderHandler.isDataServer()) { // If we are the first replication server warned, @@ -2722,17 +2685,6 @@ } /** * Store a change time value received from a data server. * @param csn The provided change time. */ public void storeReceivedCTHeartbeat(CSN csn) { // TODO:Maybe we can spare processing by only storing CSN (timestamp) // instead of a server state. getChangeTimeHeartbeatState().update(csn); } /** * This methods count the changes, server by server : * - from a serverState start point * - to (inclusive) an end point (the provided endCSN). opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -47,8 +47,8 @@ long getDomainChangesCount(DN baseDN); /** * Returns the oldest {@link CSN}s of each serverId for the specified * replication domain. * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in * the specified replication domain. * * @param baseDN * the replication domain baseDN @@ -59,8 +59,8 @@ ServerState getDomainOldestCSNs(DN baseDN); /** * Returns the newest {@link CSN}s of each serverId for the specified * replication domain. * Returns the newest {@link CSN}s from the replicaDBs for each serverId in * the specified replication domain. * * @param baseDN * the replication domain baseDN @@ -71,6 +71,18 @@ ServerState getDomainNewestCSNs(DN baseDN); /** * Returns the last time each serverId was seen alive for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => CSN} Map. Can be * null if the config that computes change numbers is set to false or * if domain is not replicated. */ ServerState getDomainLastAliveCSNs(DN baseDN); /** * Retrieves the latest trim date for the specified replication domain. * <p> * FIXME will be removed when ECLServerHandler will not be responsible anymore opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -101,12 +101,13 @@ private volatile CSN mediumConsistencyCSN; /** * Holds the most recent changes or heartbeats received for each serverIds * cross domain. changes are stored in the replicaDBs and hence persistent, * heartbeats are transient because they are easily constructed on normal * operations. * Holds the last time each replica was seen alive, whether via updates or * heartbeats received. Data is held for each serverId cross domain. * <p> * Updates are persistent and stored in the replicaDBs, heartbeats are * transient and are easily constructed on normal operations. */ private final MultiDomainServerState lastSeenUpdates = private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); @@ -174,7 +175,7 @@ */ public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) { lastSeenUpdates.update(baseDN, heartbeatCSN); lastAliveCSNs.update(baseDN, heartbeatCSN); tryNotify(baseDN); } @@ -192,13 +193,27 @@ throws ChangelogException { final CSN csn = updateMsg.getCSN(); lastSeenUpdates.update(baseDN, csn); lastAliveCSNs.update(baseDN, csn); // only keep the oldest CSN that will be the new cursor's starting point newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); tryNotify(baseDN); } /** * Returns the last time each serverId was seen alive for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => CSN} Map. Can be * null if domain is not replicated. */ public ServerState getDomainLastAliveCSNs(DN baseDN) { return lastAliveCSNs.getServerState(baseDN); } /** * Signals a replica went offline. * * @param baseDN @@ -208,7 +223,7 @@ */ public void replicaOffline(DN baseDN, CSN offlineCSN) { lastSeenUpdates.update(baseDN, offlineCSN); lastAliveCSNs.update(baseDN, offlineCSN); replicasOffline.update(baseDN, offlineCSN); tryNotify(baseDN); } @@ -234,8 +249,8 @@ if (mcCSN != null) { final int serverId = mcCSN.getServerId(); final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId); return mcCSN.isOlderThan(lastSeenSameServerId); CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId); return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive); } return true; } @@ -269,7 +284,7 @@ } ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); lastSeenUpdates.update(baseDN, latestKnownState); lastAliveCSNs.update(baseDN, latestKnownState); } resetNextChangeForInsertDBCursor(); @@ -491,7 +506,7 @@ if (offlineCSN != null && offlineCSN.isOlderThan(mediumConsistencyCSN) // If no new updates has been seen for this replica && lastSeenUpdates.removeCSN(baseDN, offlineCSN)) && lastAliveCSNs.removeCSN(baseDN, offlineCSN)) { removeCursor(baseDN, csn); replicasOffline.removeCSN(baseDN, offlineCSN); opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -539,6 +539,23 @@ /** {@inheritDoc} */ @Override public ServerState getDomainLastAliveCSNs(DN baseDN) { final ChangeNumberIndexer indexer = this.cnIndexer.get(); if (indexer != null) { final ServerState results = indexer.getDomainLastAliveCSNs(baseDN); if (results != null) { // return a copy to protect against concurrent modifications return results.duplicate(); } } return null; } /** {@inheritDoc} */ @Override public void removeDomain(DN baseDN) throws ChangelogException { // Remember the first exception because : @@ -791,8 +808,11 @@ @Override public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) { // TODO implement this when the changelogDB will be responsible for // maintaining the medium consistency point final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { indexer.publishHeartbeat(baseDN, heartbeatCSN); } } /** {@inheritDoc} */ opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -552,9 +552,8 @@ debugInfo(tn, "Starting test\n\n"); // root entry returned final InternalSearchOperation op = searchOnChangelog( "(objectclass=*)", Collections.<String>emptySet(), createControls(""), 1, tn); waitOpResult(op, ResultCode.SUCCESS); searchOnChangelog("(objectclass=*)", Collections.<String> emptySet(), createControls(""), 1, ResultCode.SUCCESS, tn); debugInfo(tn, "Ending test successfully"); } @@ -585,8 +584,7 @@ /** Add an entry in the database */ private void addEntry(Entry entry) throws Exception { AddOperation addOp = connection.processAdd(entry); waitOpResult(addOp, ResultCode.SUCCESS); waitOpResult(connection.processAdd(entry), ResultCode.SUCCESS); assertNotNull(getEntry(entry.getDN(), 1000, true)); } @@ -868,10 +866,8 @@ throws Exception { debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]"); final InternalSearchOperation searchOp = searchOnChangelog( filterString, ALL_ATTRIBUTES, createControls(cookie), expectedNbEntries, testName); waitOpResult(searchOp, expectedResultCode); return searchOp; return searchOnChangelog(filterString, ALL_ATTRIBUTES, createControls(cookie), expectedNbEntries, expectedResultCode, testName); } private InternalSearchOperation searchOnChangelog(String filterString, @@ -879,15 +875,13 @@ throws Exception { debugInfo(testName, " Search: " + filterString); final InternalSearchOperation searchOp = searchOnChangelog( filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, testName); waitOpResult(searchOp, expectedResultCode); return searchOp; return searchOnChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, expectedResultCode, testName); } private InternalSearchOperation searchOnChangelog(String filterString, Set<String> attributes, List<Control> controls, int expectedNbEntries, String testName) throws Exception ResultCode expectedResultCode, String testName) throws Exception { InternalSearchOperation op = null; int cnt = 0; @@ -912,6 +906,7 @@ final List<SearchResultEntry> entries = op.getSearchEntries(); assertThat(entries).hasSize(expectedNbEntries); debugAndWriteEntries(getLDIFWriter(), entries, testName); waitOpResult(op, expectedResultCode); return op; } @@ -2104,7 +2099,6 @@ ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN); debugInfo(tn, rsd1.getBaseDN() + " LatestServerState=" + rsd1.getLatestServerState() + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState() + " eligibleCSN=" + rsd1.getEligibleCSN() + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null)); // FIXME:ECL Enable this test by adding an assert on the right value @@ -2112,7 +2106,6 @@ ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2); debugInfo(tn, rsd2.getBaseDN() + " LatestServerState=" + rsd2.getLatestServerState() + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState() + " eligibleCSN=" + rsd2.getEligibleCSN() + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null)); // FIXME:ECL Enable this test by adding an assert on the right value @@ -2882,13 +2875,10 @@ } finally { final DeleteOperation delOp1 = connection.processDelete( DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2)); waitOpResult(delOp1, ResultCode.SUCCESS); final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2); waitOpResult(delOp2, ResultCode.SUCCESS); final DeleteOperation delOp3 = connection.processDelete(baseDN3); waitOpResult(delOp3, ResultCode.SUCCESS); final DN fionaDN = DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2); waitOpResult(connection.processDelete(fionaDN), ResultCode.SUCCESS); waitOpResult(connection.processDelete(TEST_ROOT_DN2), ResultCode.SUCCESS); waitOpResult(connection.processDelete(baseDN3), ResultCode.SUCCESS); remove(domain21, domain2, domain3); removeTestBackend(backend2, backend3);