OPENDJ-1231 Make the Medium Consistency Point support replica heartbeats
Code cleanup: removed code that has been obsoleted by having the separate ChangeNumberIndexer thread.
Matt and I think that this code was necessary because the change number was computed lazily.
Now that the change number is computed eagerly, there is no need for this obsolete code.
Additional potential work that might be done later:
- Could we remove the counter records from the replicaDBs?
- Could we remove ECLServerHandler.eligibleCSN()?
ReplicationServer.java:
In getECLChangeNumberLimits():
- removed the for loop inferring the "eligibleCount", i.e. the number of changes that are in the replicaDBs but not yet in the ChangeNumberIndexDB
- removed all the parameters which are now useless
ReplicationServerDomain.java, ReplicationDomainDB.java, JEChangelogDB.java, JEReplicaDB.java, ReplicationDB.java:
Transitively removed all the methods that were called by ReplicationServer.getECLChangeNumberLimits().
ECLServerHandler.java, FirstChangeNumberVirtualAttributeProvider.java, LastChangeNumberVirtualAttributeProvider.java:
Removed all the code that was getting and passing in the parameters of ReplicationServer.getECLChangeNumberLimits().
ExternalChangeLogTest.java:
Removed now useless test ECLReplicationServerFullTest10().
JEReplicaDBTest.java:
Renamed testDbCounts() to testGetOldestNewestCSNs().
Removed now useless test testGetCountNoCounterRecords()
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | |
| | | DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe!=null) |
| | | { |
| | | // Set a list of excluded domains (also exclude 'cn=changelog' itself) |
| | | Set<String> excludedDomains = |
| | | MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | final ReplicationServer rs = eclwe.getReplicationServer(); |
| | | final long[] limits = rs.getECLChangeNumberLimits( |
| | | rs.getEligibleCSN(excludedDomains), excludedDomains); |
| | | final long[] limits = rs.getECLChangeNumberLimits(); |
| | | value = String.valueOf(limits[0]); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | |
| | | DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe!=null) |
| | | { |
| | | // Set a list of excluded domains (also exclude 'cn=changelog' itself) |
| | | Set<String> excludedDomains = |
| | | MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | final ReplicationServer rs = eclwe.getReplicationServer(); |
| | | final long[] limits = rs.getECLChangeNumberLimits( |
| | | rs.getEligibleCSN(excludedDomains), excludedDomains); |
| | | final long[] limits = rs.getECLChangeNumberLimits(); |
| | | value = String.valueOf(limits[1]); |
| | | } |
| | | } |
| | |
| | | * Get the changeNumberLimits (from the eligibleCSN obtained at the start of |
| | | * this method) in order to have the oldest and newest change numbers. |
| | | */ |
| | | final long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | eligibleCSN, excludedBaseDNs); |
| | | final long[] limits = replicationServer.getECLChangeNumberLimits(); |
| | | final long oldestChangeNumber = limits[0]; |
| | | final long newestChangeNumber = limits[1]; |
| | | |
| | |
| | | |
| | | /** |
| | | * Get the oldest and newest change numbers. |
| | | * <p> |
| | | * Implementation detail (but it could be more than a detail): The newest |
| | | * change number seem to be a "potential" newest number. It adds up the |
| | | * newesst change number to the number of changes coming from a domain's |
| | | * ReplicaDBs. |
| | | * |
| | | * @param endCSN |
| | | * The CSN used as the upper limit when computing the newest change |
| | | * number |
| | | * @param excludedBaseDNs |
| | | * The baseDNs that are excluded from the ECL. |
| | | * @return an array of size 2 holding the oldest and newest change numbers at |
| | | * indexes 0 and 1. |
| | | * @throws DirectoryException |
| | | * When it happens. |
| | | */ |
| | | public long[] getECLChangeNumberLimits(CSN endCSN, |
| | | Set<String> excludedBaseDNs) throws DirectoryException |
| | | public long[] getECLChangeNumberLimits() throws DirectoryException |
| | | { |
| | | /* The content of the CNIndexDB depends on the SEARCH operations done before |
| | | * requesting the change number. If no operations, CNIndexDB is empty. |
| | | * The limits we want to get are the "potential" limits if a request was |
| | | * done, the CNIndexDB is probably not complete to do that. |
| | | * |
| | | * The oldest change number is : |
| | | * - the oldest record from the CNIndexDB |
| | | * - if none because CNIndexDB empty, |
| | | * then |
| | | * if no change in replchangelog then return 0 |
| | | * else return 1 (change number that WILL be returned to next search) |
| | | * |
| | | * The newest change number is : |
| | | * - initialized with the newest record from the CNIndexDB (0 if none) |
| | | * and consider the genState associated |
| | | * - to the newest change number, we add the count of updates in the |
| | | * replchangelog FROM that genState TO the crossDomainEligibleCSN |
| | | * (this diff is done domain by domain) |
| | | */ |
| | | try |
| | | { |
| | | final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); |
| | | final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); |
| | | final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord(); |
| | | |
| | | boolean dbEmpty = true; |
| | | long oldestChangeNumber = 0; |
| | | long newestChangeNumber = 0; |
| | | boolean noCookieForNewestCN = true; |
| | | CSN csnForNewestCN = null; |
| | | DN baseDNForNewestCN = null; |
| | | if (oldestRecord != null) |
| | | if (oldestRecord == null) |
| | | { |
| | | // The database is empty, just keep increasing numbers since last time |
| | | // we generated one change number. |
| | | final long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber(); |
| | | return new long[] { lastGeneratedCN, lastGeneratedCN }; |
| | | } |
| | | |
| | | final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord(); |
| | | if (newestRecord == null) |
| | | { |
| | | // Edge case: DB was cleaned or closed in between calls to |
| | | // Edge case: DB was cleaned (or purged) in between calls to |
| | | // getOldest*() and getNewest*(). |
| | | // The only remaining solution is to fail fast. |
| | | throw new ChangelogException( |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, |
| | | ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get()); |
| | | } |
| | | |
| | | dbEmpty = false; |
| | | oldestChangeNumber = oldestRecord.getChangeNumber(); |
| | | newestChangeNumber = newestRecord.getChangeNumber(); |
| | | |
| | | // Get the generalized state associated with the current newest change |
| | | // number and initializes from it the startStates table |
| | | final String cookie = newestRecord.getPreviousCookie(); |
| | | noCookieForNewestCN = cookie == null || cookie.length() == 0; |
| | | |
| | | csnForNewestCN = newestRecord.getCSN(); |
| | | baseDNForNewestCN = newestRecord.getBaseDN(); |
| | | } |
| | | |
| | | long newestTime = csnForNewestCN != null ? csnForNewestCN.getTime() : 0; |
| | | for (ReplicationServerDomain rsDomain : getReplicationServerDomains()) |
| | | { |
| | | if (contains( |
| | | excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString())) |
| | | continue; |
| | | |
| | | // for this domain, have the state in the replchangelog |
| | | // where the newest change number update is |
| | | long ec; |
| | | if (noCookieForNewestCN) |
| | | { |
| | | // Count changes of this domain from the beginning of the changelog |
| | | final ServerState startState = rsDomain.getOldestState() |
| | | .duplicateOnlyOlderThan(rsDomain.getLatestDomainTrimDate()); |
| | | ec = rsDomain.getEligibleCount(startState, endCSN); |
| | | } |
| | | else |
| | | { |
| | | // There are records in the CNIndexDB (so already returned to clients) |
| | | // BUT |
| | | // There is nothing related to this domain in the newest record |
| | | // (maybe this domain was disabled when this record was returned). |
| | | // In that case, are counted the changes from the time of the most |
| | | // recent change |
| | | |
| | | // And count changes of this domain from the date of the |
| | | // newest seqnum record (that does not refer to this domain) |
| | | CSN csnx = new CSN(newestTime, csnForNewestCN.getSeqnum(), 0); |
| | | ec = rsDomain.getEligibleCount(csnx, endCSN); |
| | | |
| | | if (baseDNForNewestCN.equals(rsDomain.getBaseDN())) |
| | | ec--; |
| | | } |
| | | |
| | | // cumulates on domains |
| | | newestChangeNumber += ec; |
| | | |
| | | // CNIndexDB is empty and there are eligible updates in the replication |
| | | // changelog then init oldest change number |
| | | if (ec > 0 && oldestChangeNumber == 0) |
| | | oldestChangeNumber = 1; |
| | | } |
| | | |
| | | if (dbEmpty) |
| | | { |
| | | // The database was empty, just keep increasing numbers since last time |
| | | // we generated one change number. |
| | | long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber(); |
| | | oldestChangeNumber += lastGeneratedCN; |
| | | newestChangeNumber += lastGeneratedCN; |
| | | } |
| | | return new long[] { oldestChangeNumber, newestChangeNumber }; |
| | | return new long[] { oldestRecord.getChangeNumber(), |
| | | newestRecord.getChangeNumber() }; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Count the number of changes in the replication changelog for the provided |
| | | * serverID, between 2 provided CSNs. |
| | | * @param serverId Identifier of the server for which to compute the count. |
| | | * @param from lower limit CSN. |
| | | * @param to upper limit CSN. |
| | | * @return the number of changes. |
| | | */ |
| | | public long getCount(int serverId, CSN from, CSN to) |
| | | { |
| | | return domainDB.getCount(baseDN, serverId, from, to); |
| | | } |
| | | |
| | | /** |
| | | * Returns the change count for that ReplicationServerDomain. |
| | | * |
| | | * @return the change count. |
| | |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server : |
| | | * - from a serverState start point |
| | | * - to (inclusive) an end point (the provided endCSN). |
| | | * @param startState The provided start server state. |
| | | * @param endCSN The provided end CSN. |
| | | * @return The number of changes between startState and endCSN. |
| | | */ |
| | | public long getEligibleCount(ServerState startState, CSN endCSN) |
| | | { |
| | | long res = 0; |
| | | |
| | | for (CSN csn : getLatestServerState()) |
| | | { |
| | | CSN startCSN = startState.getCSN(csn.getServerId()); |
| | | long serverIdRes = getCount(csn.getServerId(), startCSN, endCSN); |
| | | |
| | | // The startPoint is excluded when counting the ECL eligible changes |
| | | if (startCSN != null && serverIdRes > 0) |
| | | { |
| | | serverIdRes--; |
| | | } |
| | | |
| | | res += serverIdRes; |
| | | } |
| | | return res; |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server: |
| | | * - from a start CSN |
| | | * - to (inclusive) an end point (the provided endCSN). |
| | | * @param startCSN The provided start CSN. |
| | | * @param endCSN The provided end CSN. |
| | | * @return The number of changes between startTime and endCSN. |
| | | */ |
| | | public long getEligibleCount(CSN startCSN, CSN endCSN) |
| | | { |
| | | long res = 0; |
| | | for (CSN csn : getLatestServerState()) |
| | | { |
| | | int serverId = csn.getServerId(); |
| | | CSN lStartCSN = |
| | | new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId); |
| | | res += getCount(serverId, lStartCSN, endCSN); |
| | | } |
| | | return res; |
| | | } |
| | | |
| | | /** |
| | | * Get the latest (more recent) trim date of the changelog dbs associated |
| | | * to this domain. |
| | | * @return The latest trim date. |
| | |
| | | // serverId methods |
| | | |
| | | /** |
| | | * Return the number of changes inclusive between 2 provided {@link CSN}s for |
| | | * the specified serverId and replication domain. i.e. the <code>from</code> |
| | | * and <code>to</code> CSNs are included in the count. |
| | | * <p> |
| | | * Note that: |
| | | * <ol> |
| | | * <li>If <code>from</code> is null, the count starts at the oldest CSN in the |
| | | * database.</li> |
| | | * <li>If <code>to</code> is null, the count is 0.</li> |
| | | * <li>If both from and to are present, then the count includes them both |
| | | * <code>to</code> is null, the count ends at the newest CSN in the database. |
| | | * </li> |
| | | * <li>incidentally, if both <code>from</code> and <code>to</code> are null, |
| | | * the total count of entries in the replica database is returned.</li> |
| | | * </ol> |
| | | * <h6>Example</h6> |
| | | * <p> |
| | | * Given the following replica database for baseDN "dc=example,dc=com" and |
| | | * serverId 1: |
| | | * |
| | | * <pre> |
| | | * CSN1 <= Oldest |
| | | * CSN2 |
| | | * CSN3 |
| | | * CSN4 |
| | | * CSN5 <= Newest |
| | | * </pre> |
| | | * |
| | | * Then: |
| | | * |
| | | * <pre> |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5); |
| | | * </pre> |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param serverId |
| | | * the serverId on which to act |
| | | * @param from |
| | | * The older CSN where to start the count |
| | | * @param to |
| | | * The newer CSN where to end the count |
| | | * @return The computed number of changes |
| | | */ |
| | | long getCount(DN baseDN, int serverId, CSN from, CSN to); |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the replicaDBs for the specified |
| | | * replication domain, with all cursors starting after the provided CSN. |
| | | * <p> |
| | |
| | | StaticUtils.recursiveDelete(dbDirectory); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getCount(DN baseDN, int serverId, CSN from, CSN to) |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.getCount(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | |
| | | db.setCounterRecordWindowSize(size); |
| | | } |
| | | |
| | | /** |
| | | * Return the number of changes between 2 provided CSNs. |
| | | * This a alternative to traverseAndCount, expected to be much more efficient |
| | | * when there is a huge number of changes in the Db. |
| | | * @param from The lower (older) CSN. |
| | | * @param to The upper (newer) CSN. |
| | | * @return The computed number of changes. |
| | | */ |
| | | public long getCount(CSN from, CSN to) |
| | | { |
| | | // Now that we always keep the last CSN in the DB to avoid expiring cookies |
| | | // too quickly, we need to check if the "to" is older than the trim date. |
| | | if (to == null || !to.isOlderThan(new CSN(latestTrimDate, 0, 0))) |
| | | { |
| | | flush(); |
| | | return db.count(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | public class ReplicationDB |
| | | { |
| | | private static final int START = 0; |
| | | private static final int STOP = 1; |
| | | |
| | | private Database db; |
| | | private ReplicationDbEnv dbenv; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Count the number of changes between 2 changes numbers (inclusive). |
| | | * @param start The lower limit of the count. |
| | | * @param stop The higher limit of the count. |
| | | * @return The number of changes between provided start and stop CSN. |
| | | * Returns 0 when an error occurs. |
| | | */ |
| | | public long count(CSN start, CSN stop) |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | // If the DB has been closed then return immediately. |
| | | if (isDBClosed()) |
| | | { |
| | | return 0; |
| | | } |
| | | if (start == null && stop == null) |
| | | { |
| | | return db.count(); |
| | | } |
| | | |
| | | int[] counterValues = new int[2]; |
| | | int[] distanceToCounterRecords = new int[2]; |
| | | |
| | | // Step 1 : from the start point, traverse db to the next counter record |
| | | // or to the stop point. |
| | | findFirstCounterRecordAfterStartPoint(start, stop, counterValues, |
| | | distanceToCounterRecords); |
| | | |
| | | // cases |
| | | if (counterValues[START] == 0) |
| | | return distanceToCounterRecords[START]; |
| | | |
| | | // Step 2 : from the stop point, traverse db to the next counter record |
| | | // or to the start point. |
| | | if (!findFirstCounterRecordBeforeStopPoint(start, stop, counterValues, |
| | | distanceToCounterRecords)) |
| | | { |
| | | // database is empty |
| | | return 0; |
| | | } |
| | | |
| | | // Step 3 : Now consolidates the result |
| | | return computeDistance(counterValues, distanceToCounterRecords); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | dbenv.shutdownOnException(e); |
| | | } |
| | | finally |
| | | { |
| | | dbCloseLock.readLock().unlock(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | private void findFirstCounterRecordAfterStartPoint(CSN start, CSN stop, |
| | | int[] counterValues, int[] distanceToCounterRecords) |
| | | throws DatabaseException |
| | | { |
| | | Cursor cursor = db.openCursor(null, null); |
| | | try |
| | | { |
| | | OperationStatus status; |
| | | DatabaseEntry key; |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (start != null) |
| | | { |
| | | key = createReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | // JNR: I suspect this is equivalent to writing cursor.getFirst(). |
| | | // If it is, then please change the code to make it clearer. |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[START] = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | |
| | | // it is a regular change record |
| | | if (csn.isNewerThan(stop)) |
| | | { // we are outside the range: we reached the 'stop' target |
| | | break; |
| | | } |
| | | |
| | | distanceToCounterRecords[START]++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | // loop to update the distance and possibly find a counter record |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private boolean findFirstCounterRecordBeforeStopPoint(CSN start, CSN stop, |
| | | int[] counterValues, int[] distanceToCounterRecords) |
| | | throws DatabaseException |
| | | { |
| | | Cursor cursor = db.openCursor(null, null); |
| | | try |
| | | { |
| | | DatabaseEntry key = createReplicationKey(stop); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[STOP] = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | |
| | | // it is a regular change record |
| | | if (csn.isOlderThan(start)) |
| | | { // we are outside the range: we reached the 'start' target |
| | | break; |
| | | } |
| | | |
| | | distanceToCounterRecords[STOP]++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | // loop to update the distance and possibly find a counter record |
| | | } |
| | | return true; |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The diagram below shows a visual description of how the distance between |
| | | * two CSNs in the database is computed. |
| | | * |
| | | * <pre> |
| | | * +--------+ +--------+ |
| | | * | CASE 1 | | CASE 2 | |
| | | * +--------+ +--------+ |
| | | * |
| | | * CSN CSN |
| | | * ----- ----- |
| | | * START => ----- START => ----- |
| | | * ^ ----- ^ ----- |
| | | * | ----- | ----- |
| | | * dist 1 ----- dist 1 ----- |
| | | * | ----- | ----- |
| | | * v ----- v ----- |
| | | * CR 1&2 => [1000] CR 1 => [1000] |
| | | * ^ ----- ----- |
| | | * | ----- ----- |
| | | * dist 2 ----- ----- |
| | | * | ----- ----- |
| | | * v ----- ----- |
| | | * STOP => ----- ----- |
| | | * ----- ----- |
| | | * CR => [2000] CR 2 => [2000] |
| | | * ----- ^ ----- |
| | | * | ----- |
| | | * dist 2 ----- |
| | | * | ----- |
| | | * v ----- |
| | | * STOP => ----- |
| | | * </pre> |
| | | * |
| | | * Explanation of the terms used: |
| | | * <dl> |
| | | * <dt>START</dt> |
| | | * <dd>Start CSN for the count</dd> |
| | | * <dt>STOP</dt> |
| | | * <dd>Stop CSN for the count</dd> |
| | | * <dt>dist</dt> |
| | | * <dd>Distance from START (or STOP) to the counter record</dd> |
| | | * <dt>CSN</dt> |
| | | * <dd>Stands for "Change Sequence Number". Below it, the database is |
| | | * symbolized, where each record is represented by using dashes "-----". The |
| | | * database is ordered.</dd> |
| | | * <dt>CR</dt> |
| | | * <dd>Stands for "Counter Record". Counter Records are inserted in the |
| | | * database along with real CSNs, but they are not real changes. They are only |
| | | * used to speed up calculating the distance between 2 CSNs without the need |
| | | * to scan the whole database in between.</dd> |
| | | * </dl> |
| | | */ |
| | | private long computeDistance(int[] counterValues, |
| | | int[] distanceToCounterRecords) |
| | | { |
| | | if (counterValues[START] != 0) |
| | | { |
| | | if (counterValues[START] == counterValues[STOP]) |
| | | { |
| | | // only one counter record between from and to - no need to use it |
| | | return distanceToCounterRecords[START] + distanceToCounterRecords[STOP]; |
| | | } |
| | | // at least 2 counter records between from and to |
| | | return distanceToCounterRecords[START] |
| | | + (counterValues[STOP] - counterValues[START]) |
| | | + distanceToCounterRecords[STOP]; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Whether a provided CSN represents a counter record. A counter record is |
| | | * used to store the time. |
| | | * |
| | |
| | | import org.opends.server.tools.LDAPWriter; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.LDIFWriter; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | |
| | | ECLSimultaneousPsearches(); |
| | | } |
| | | |
| | | @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) |
| | | public void ECLReplicationServerFullTest10() throws Exception |
| | | { |
| | | // Test eligible count method. |
| | | ECLGetEligibleCountTest(); |
| | | } |
| | | |
| | | // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned |
| | | // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned |
| | | // TODO:ECL Test invalid DN in cookie returns UNWILLING + message |
| | |
| | | return null; |
| | | } |
| | | |
| | | private void ECLGetEligibleCountTest() throws Exception |
| | | { |
| | | String tn = "ECLGetEligibleCountTest"; |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | String user1entryUUID = "11111111-1112-1113-1114-111111111115"; |
| | | |
| | | final CSN[] csns = generateCSNs(4, SERVER_ID_1); |
| | | final CSN csn1 = csns[0]; |
| | | final CSN csn2 = csns[1]; |
| | | final CSN csn3 = csns[2]; |
| | | |
| | | getCNIndexDB().setPurgeDelay(0); |
| | | ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN); |
| | | // this empty state will force to count from the start of the DB |
| | | final ServerState fromStart = new ServerState(); |
| | | |
| | | // The replication changelog is empty |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, csns[0]), 0); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 1000, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | // Publish one first message |
| | | DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | Thread.sleep(300); |
| | | |
| | | // From begin to now : 1 change |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, now()), 1); |
| | | |
| | | // Publish one second message |
| | | delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn2, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | |
| | | // From begin to now : 2 changes |
| | | searchOnChangelog("(changenumber>=1)", 2, tn, SUCCESS); |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, now()), 2); |
| | | |
| | | // From begin to first change (inclusive) : 1 change = csn1 |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, csn1), 1); |
| | | |
| | | final ServerState fromStateBeforeCSN1 = new ServerState(); |
| | | fromStateBeforeCSN1.update(csn1); |
| | | |
| | | // From state/csn1(exclusive) to csn1 (inclusive) : 0 change |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN1, csn1), 0); |
| | | |
| | | // From state/csn1(exclusive) to csn2 (inclusive) : 1 change = csn2 |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN1, csn2), 1); |
| | | |
| | | final ServerState fromStateBeforeCSN2 = new ServerState(); |
| | | fromStateBeforeCSN2.update(csn2); |
| | | |
| | | // From state/csn2(exclusive) to now (inclusive) : 0 change |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN2, now()), 0); |
| | | |
| | | // Publish one third message |
| | | delMsg = newDeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, csn3, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | Thread.sleep(300); |
| | | |
| | | fromStateBeforeCSN2.update(csn2); |
| | | |
| | | // From state/csn2(exclusive) to now : 1 change = csn3 |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN2, now()), 1); |
| | | |
| | | boolean perfs=false; |
| | | if (perfs) |
| | | { |
| | | // number of msgs used by the test |
| | | final int maxMsg = 999999; |
| | | |
| | | // We need an RS configured with a window size bigger than the number |
| | | // of msg used by the test. |
| | | assertTrue(maxMsg<maxWindow); |
| | | debugInfo(tn, "Perf test in compat mode - will generate " + maxMsg + " msgs."); |
| | | for (int i=4; i<=maxMsg; i++) |
| | | { |
| | | CSN csnx = new CSN(TimeThread.getTime(), i, SERVER_ID_1); |
| | | delMsg = newDeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, csnx, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | } |
| | | Thread.sleep(1000); |
| | | debugInfo(tn, "Perfs test in compat - search lastChangeNumber"); |
| | | Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | long t1 = TimeThread.getTime(); |
| | | long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | replicationServer.getEligibleCSN(excludedDomains), excludedDomains); |
| | | assertEquals(limits[1], maxMsg); |
| | | long t2 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - " + maxMsg + " counted in (ms):" + (t2 - t1)); |
| | | |
| | | String filter = "(changenumber>=" + maxMsg + ")"; |
| | | searchOnChangelog(filter, 1, tn, SUCCESS); |
| | | long t3 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - last change searched in (ms):" + (t3 - t2)); |
| | | |
| | | filter = "(changenumber>=" + maxMsg + ")"; |
| | | searchOnChangelog(filter, 1, tn, SUCCESS); |
| | | long t4 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - last change searched in (ms):" + (t4 - t3)); |
| | | |
| | | filter = "(changenumber>=" + (maxMsg - 2) + ")"; |
| | | searchOnChangelog(filter, 3, tn, SUCCESS); |
| | | long t5 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - last 3 changes searched in (ms):" + (t5 - t4)); |
| | | } |
| | | stop(server01); |
| | | debugInfo(tn, "Ending test with success"); |
| | | } |
| | | |
| | | private CSN now() |
| | | { |
| | | return new CSN(TimeThread.getTime(), 1, SERVER_ID_1); |
| | | } |
| | | |
| | | /** |
| | | * Test ECl entry attributes, and there configuration. |
| | | */ |
| | |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetCountNoCounterRecords() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5); |
| | | for (CSN csn : csns) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | replicaDB.flush(); |
| | | |
| | | assertEquals(replicaDB.getCount(csns[0], csns[0]), 1); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[1]), 2); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[4]), 5); |
| | | assertEquals(replicaDB.getCount(null, csns[4]), 5); |
| | | assertEquals(replicaDB.getCount(csns[0], null), 0); |
| | | assertEquals(replicaDB.getCount(null, null), 5); |
| | | } |
| | | finally |
| | | { |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test the logic that manages counter records in the JEReplicaDB in order to |
| | | * optimize the counting of record in the replication changelog db. |
| | | * optimize the oldest and newest records in the replication changelog db. |
| | | */ |
| | | @Test(enabled=true, groups = { "opendj-256" }) |
| | | void testDbCounts() throws Exception |
| | | void testGetOldestNewestCSNs() throws Exception |
| | | { |
| | | // FIXME: for some reason this test is always failing in Jenkins when run as |
| | | // part of the unit tests. Here is the output (the failure is 100% |
| | | // reproducible and always has the same value of 3004): |
| | | // |
| | | // Failed Test: |
| | | // org.opends.server.replication.server.JEReplicaDBTest#testDbCounts |
| | | // [testng] Failure Cause: java.lang.AssertionError: AFTER PURGE |
| | | // expected:<8000> but was:<3004> |
| | | // [testng] org.testng.Assert.fail(Assert.java:84) |
| | | // [testng] org.testng.Assert.failNotEquals(Assert.java:438) |
| | | // [testng] org.testng.Assert.assertEquals(Assert.java:108) |
| | | // [testng] org.testng.Assert.assertEquals(Assert.java:323) |
| | | // [testng] |
| | | // org.opends.server.replication.server.JEReplicaDBTest.testDBCount(JEReplicaDBTest.java:594) |
| | | // [testng] |
| | | // org.opends.server.replication.server.JEReplicaDBTest.testDbCounts(JEReplicaDBTest.java:389) |
| | | |
| | | // It's worth testing with 2 different setting for counterRecord |
| | | // - a counter record is put every 10 Update msg in the db - just a unit |
| | | // setting. |
| | |
| | | // - when start and stop are after the first counter record, |
| | | // - when start and stop are before and after more than one counter record, |
| | | // After a purge. |
| | | // After shutdowning/closing and reopening the db. |
| | | testDBCount(40, 10); |
| | | // FIXME next line is the one failing with the stacktrace above |
| | | testDBCount(4000, 1000); |
| | | // After shutting down/closing and reopening the db. |
| | | testGetOldestNewestCSNs(40, 10); |
| | | testGetOldestNewestCSNs(4000, 1000); |
| | | } |
| | | |
| | | private void testDBCount(int max, int counterWindow) throws Exception |
| | | private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception |
| | | { |
| | | String tn = "testDBCount("+max+","+counterWindow+")"; |
| | | debugInfo(tn, "Starting test"); |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | // Test count in different subcases trying to handle all special cases |
| | | // regarding the 'counter' record and 'count' algorithm |
| | | assertCount(tn, replicaDB, csns[1], csns[1], 1, "FROM change1 TO change1 "); |
| | | assertCount(tn, replicaDB, csns[1], csns[2], 2, "FROM change1 TO change2 "); |
| | | assertCount(tn, replicaDB, csns[1], csns[counterWindow], counterWindow, |
| | | "FROM change1 TO counterWindow=" + counterWindow); |
| | | |
| | | final int j = counterWindow + 1; |
| | | assertCount(tn, replicaDB, csns[1], csns[j], j, |
| | | "FROM change1 TO counterWindow+1=" + j); |
| | | final int k = 2 * counterWindow; |
| | | assertCount(tn, replicaDB, csns[1], csns[k], k, |
| | | "FROM change1 TO 2*counterWindow=" + k); |
| | | final int l = k + 1; |
| | | assertCount(tn, replicaDB, csns[1], csns[l], l, |
| | | "FROM change1 TO 2*counterWindow+1=" + l); |
| | | assertCount(tn, replicaDB, csns[2], csns[5], 4, |
| | | "FROM change2 TO change5 "); |
| | | assertCount(tn, replicaDB, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4, |
| | | "FROM counterWindow+2 TO counterWindow+5 "); |
| | | assertCount(tn, replicaDB, csns[2], csns[(counterWindow + 5)], counterWindow + 4, |
| | | "FROM change2 TO counterWindow+5 "); |
| | | assertCount(tn, replicaDB, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1, |
| | | "FROM counterWindow+4 TO counterWindow+4 "); |
| | | |
| | | CSN olderThanOldest = null; |
| | | CSN newerThanNewest = new CSN(System.currentTimeMillis() + (2*(max+1)), 100, 1); |
| | | |
| | | // Now we want to test with start and stop outside of the db |
| | | |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | assertCount(tn, replicaDB, olderThanOldest, newerThanNewest, max, |
| | | "FROM null (start of time) TO now (> newest change in the db)"); |
| | | |
| | | // Now we want to test that after closing and reopening the db, the |
| | | // counting algo is well reinitialized and when new messages are added |
| | | // the new counter are correctly generated. |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | |
| | | // Populate the db with 'max' msg |
| | | for (int i=max+1; i<=(2*max); i++) |
| | | { |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, 2 * max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | |
| | | // |
| | | |
| | | replicaDB.setPurgeDelay(100); |
| | | sleep(4000); |
| | | long totalCount = replicaDB.getCount(null, null); |
| | | debugInfo(tn, "FROM our first generated change TO now (> newest change in the db)" + " After purge, total count=" + totalCount); |
| | | sleep(1000); |
| | | |
| | | String testcase = "AFTER PURGE (oldest, newest)="; |
| | | debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN()); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest="); |
| | | |
| | | int expectedCnt; |
| | | if (totalCount>1) |
| | | { |
| | | final int newestSeqnum = replicaDB.getNewestCSN().getSeqnum(); |
| | | final int oldestSeqnum = replicaDB.getOldestCSN().getSeqnum(); |
| | | expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1; |
| | | } |
| | | else |
| | | { |
| | | expectedCnt = 0; |
| | | } |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE"); |
| | | |
| | | // Clear ... |
| | | debugInfo(tn,"clear:"); |
| | | replicaDB.clear(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertCount(String tn, JEReplicaDB replicaDB, CSN from, CSN to, |
| | | int expectedCount, String testcase) |
| | | { |
| | | long actualCount = replicaDB.getCount(from, to); |
| | | debugInfo(tn, testcase + " actualCount=" + actualCount); |
| | | assertEquals(actualCount, expectedCount, testcase); |
| | | } |
| | | |
| | | } |