| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | |
| | | { |
| | | ECLWorkflowElement eclwe = (ECLWorkflowElement) |
| | | DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe!=null) |
| | | if (eclwe != null) |
| | | { |
| | | // Set a list of excluded domains (also exclude 'cn=changelog' itself) |
| | | Set<String> excludedDomains = |
| | | MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | final ReplicationServer rs = eclwe.getReplicationServer(); |
| | | final long[] limits = rs.getECLChangeNumberLimits( |
| | | rs.getEligibleCSN(excludedDomains), excludedDomains); |
| | | final long[] limits = rs.getECLChangeNumberLimits(); |
| | | value = String.valueOf(limits[0]); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | |
| | | { |
| | | ECLWorkflowElement eclwe = (ECLWorkflowElement) |
| | | DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe!=null) |
| | | if (eclwe != null) |
| | | { |
| | | // Set a list of excluded domains (also exclude 'cn=changelog' itself) |
| | | Set<String> excludedDomains = |
| | | MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | final ReplicationServer rs = eclwe.getReplicationServer(); |
| | | final long[] limits = rs.getECLChangeNumberLimits( |
| | | rs.getEligibleCSN(excludedDomains), excludedDomains); |
| | | final long[] limits = rs.getECLChangeNumberLimits(); |
| | | value = String.valueOf(limits[1]); |
| | | } |
| | | } |
| | |
| | | * Get the changeNumberLimits (from the eligibleCSN obtained at the start of |
| | | * this method) in order to have the oldest and newest change numbers. |
| | | */ |
| | | final long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | eligibleCSN, excludedBaseDNs); |
| | | final long[] limits = replicationServer.getECLChangeNumberLimits(); |
| | | final long oldestChangeNumber = limits[0]; |
| | | final long newestChangeNumber = limits[1]; |
| | | |
| | |
| | | |
| | | /** |
| | | * Get the oldest and newest change numbers. |
| | | * <p> |
| | | * Implementation detail (but it could be more than a detail): The newest |
| | | * change number seem to be a "potential" newest number. It adds up the |
| | | * newesst change number to the number of changes coming from a domain's |
| | | * ReplicaDBs. |
| | | * |
| | | * @param endCSN |
| | | * The CSN used as the upper limit when computing the newest change |
| | | * number |
| | | * @param excludedBaseDNs |
| | | * The baseDNs that are excluded from the ECL. |
| | | * @return an array of size 2 holding the oldest and newest change numbers at |
| | | * indexes 0 and 1. |
| | | * @throws DirectoryException |
| | | * When it happens. |
| | | */ |
| | | public long[] getECLChangeNumberLimits(CSN endCSN, |
| | | Set<String> excludedBaseDNs) throws DirectoryException |
| | | public long[] getECLChangeNumberLimits() throws DirectoryException |
| | | { |
| | | /* The content of the CNIndexDB depends on the SEARCH operations done before |
| | | * requesting the change number. If no operations, CNIndexDB is empty. |
| | | * The limits we want to get are the "potential" limits if a request was |
| | | * done, the CNIndexDB is probably not complete to do that. |
| | | * |
| | | * The oldest change number is : |
| | | * - the oldest record from the CNIndexDB |
| | | * - if none because CNIndexDB empty, |
| | | * then |
| | | * if no change in replchangelog then return 0 |
| | | * else return 1 (change number that WILL be returned to next search) |
| | | * |
| | | * The newest change number is : |
| | | * - initialized with the newest record from the CNIndexDB (0 if none) |
| | | * and consider the genState associated |
| | | * - to the newest change number, we add the count of updates in the |
| | | * replchangelog FROM that genState TO the crossDomainEligibleCSN |
| | | * (this diff is done domain by domain) |
| | | */ |
| | | try |
| | | { |
| | | final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); |
| | | final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); |
| | | final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord(); |
| | | |
| | | boolean dbEmpty = true; |
| | | long oldestChangeNumber = 0; |
| | | long newestChangeNumber = 0; |
| | | boolean noCookieForNewestCN = true; |
| | | CSN csnForNewestCN = null; |
| | | DN baseDNForNewestCN = null; |
| | | if (oldestRecord != null) |
| | | if (oldestRecord == null) |
| | | { |
| | | if (newestRecord == null) |
| | | { |
| | | // Edge case: DB was cleaned or closed in between calls to |
| | | // getOldest*() and getNewest*(). |
| | | // The only remaining solution is to fail fast. |
| | | throw new ChangelogException( |
| | | ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get()); |
| | | } |
| | | |
| | | dbEmpty = false; |
| | | oldestChangeNumber = oldestRecord.getChangeNumber(); |
| | | newestChangeNumber = newestRecord.getChangeNumber(); |
| | | |
| | | // Get the generalized state associated with the current newest change |
| | | // number and initializes from it the startStates table |
| | | final String cookie = newestRecord.getPreviousCookie(); |
| | | noCookieForNewestCN = cookie == null || cookie.length() == 0; |
| | | |
| | | csnForNewestCN = newestRecord.getCSN(); |
| | | baseDNForNewestCN = newestRecord.getBaseDN(); |
| | | } |
| | | |
| | | long newestTime = csnForNewestCN != null ? csnForNewestCN.getTime() : 0; |
| | | for (ReplicationServerDomain rsDomain : getReplicationServerDomains()) |
| | | { |
| | | if (contains( |
| | | excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString())) |
| | | continue; |
| | | |
| | | // for this domain, have the state in the replchangelog |
| | | // where the newest change number update is |
| | | long ec; |
| | | if (noCookieForNewestCN) |
| | | { |
| | | // Count changes of this domain from the beginning of the changelog |
| | | final ServerState startState = rsDomain.getOldestState() |
| | | .duplicateOnlyOlderThan(rsDomain.getLatestDomainTrimDate()); |
| | | ec = rsDomain.getEligibleCount(startState, endCSN); |
| | | } |
| | | else |
| | | { |
| | | // There are records in the CNIndexDB (so already returned to clients) |
| | | // BUT |
| | | // There is nothing related to this domain in the newest record |
| | | // (maybe this domain was disabled when this record was returned). |
| | | // In that case, are counted the changes from the time of the most |
| | | // recent change |
| | | |
| | | // And count changes of this domain from the date of the |
| | | // newest seqnum record (that does not refer to this domain) |
| | | CSN csnx = new CSN(newestTime, csnForNewestCN.getSeqnum(), 0); |
| | | ec = rsDomain.getEligibleCount(csnx, endCSN); |
| | | |
| | | if (baseDNForNewestCN.equals(rsDomain.getBaseDN())) |
| | | ec--; |
| | | } |
| | | |
| | | // cumulates on domains |
| | | newestChangeNumber += ec; |
| | | |
| | | // CNIndexDB is empty and there are eligible updates in the replication |
| | | // changelog then init oldest change number |
| | | if (ec > 0 && oldestChangeNumber == 0) |
| | | oldestChangeNumber = 1; |
| | | } |
| | | |
| | | if (dbEmpty) |
| | | { |
| | | // The database was empty, just keep increasing numbers since last time |
| | | // The database is empty, just keep increasing numbers since last time |
| | | // we generated one change number. |
| | | long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber(); |
| | | oldestChangeNumber += lastGeneratedCN; |
| | | newestChangeNumber += lastGeneratedCN; |
| | | final long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber(); |
| | | return new long[] { lastGeneratedCN, lastGeneratedCN }; |
| | | } |
| | | return new long[] { oldestChangeNumber, newestChangeNumber }; |
| | | |
| | | final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord(); |
| | | if (newestRecord == null) |
| | | { |
| | | // Edge case: DB was cleaned (or purged) in between calls to |
| | | // getOldest*() and getNewest*(). |
| | | // The only remaining solution is to fail fast. |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, |
| | | ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get()); |
| | | } |
| | | return new long[] { oldestRecord.getChangeNumber(), |
| | | newestRecord.getChangeNumber() }; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState); |
| | | } |
| | | |
| | | /** |
| | | * Count the number of changes in the replication changelog for the provided |
| | | * serverID, between 2 provided CSNs. |
| | | * @param serverId Identifier of the server for which to compute the count. |
| | | * @param from lower limit CSN. |
| | | * @param to upper limit CSN. |
| | | * @return the number of changes. |
| | | */ |
| | | public long getCount(int serverId, CSN from, CSN to) |
| | | { |
| | | return domainDB.getCount(baseDN, serverId, from, to); |
| | | } |
| | | |
| | | /** |
| | | * Returns the change count for that ReplicationServerDomain. |
| | | * |
| | |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server : |
| | | * - from a serverState start point |
| | | * - to (inclusive) an end point (the provided endCSN). |
| | | * @param startState The provided start server state. |
| | | * @param endCSN The provided end CSN. |
| | | * @return The number of changes between startState and endCSN. |
| | | */ |
| | | public long getEligibleCount(ServerState startState, CSN endCSN) |
| | | { |
| | | long res = 0; |
| | | |
| | | for (CSN csn : getLatestServerState()) |
| | | { |
| | | CSN startCSN = startState.getCSN(csn.getServerId()); |
| | | long serverIdRes = getCount(csn.getServerId(), startCSN, endCSN); |
| | | |
| | | // The startPoint is excluded when counting the ECL eligible changes |
| | | if (startCSN != null && serverIdRes > 0) |
| | | { |
| | | serverIdRes--; |
| | | } |
| | | |
| | | res += serverIdRes; |
| | | } |
| | | return res; |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server: |
| | | * - from a start CSN |
| | | * - to (inclusive) an end point (the provided endCSN). |
| | | * @param startCSN The provided start CSN. |
| | | * @param endCSN The provided end CSN. |
| | | * @return The number of changes between startTime and endCSN. |
| | | */ |
| | | public long getEligibleCount(CSN startCSN, CSN endCSN) |
| | | { |
| | | long res = 0; |
| | | for (CSN csn : getLatestServerState()) |
| | | { |
| | | int serverId = csn.getServerId(); |
| | | CSN lStartCSN = |
| | | new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId); |
| | | res += getCount(serverId, lStartCSN, endCSN); |
| | | } |
| | | return res; |
| | | } |
| | | |
| | | /** |
| | | * Get the latest (more recent) trim date of the changelog dbs associated |
| | | * to this domain. |
| | | * @return The latest trim date. |
| | |
| | | // serverId methods |
| | | |
| | | /** |
| | | * Return the number of changes inclusive between 2 provided {@link CSN}s for |
| | | * the specified serverId and replication domain. i.e. the <code>from</code> |
| | | * and <code>to</code> CSNs are included in the count. |
| | | * <p> |
| | | * Note that: |
| | | * <ol> |
| | | * <li>If <code>from</code> is null, the count starts at the oldest CSN in the |
| | | * database.</li> |
| | | * <li>If <code>to</code> is null, the count is 0.</li> |
| | | * <li>If both from and to are present, then the count includes them both |
| | | * <code>to</code> is null, the count ends at the newest CSN in the database. |
| | | * </li> |
| | | * <li>incidentally, if both <code>from</code> and <code>to</code> are null, |
| | | * the total count of entries in the replica database is returned.</li> |
| | | * </ol> |
| | | * <h6>Example</h6> |
| | | * <p> |
| | | * Given the following replica database for baseDN "dc=example,dc=com" and |
| | | * serverId 1: |
| | | * |
| | | * <pre> |
| | | * CSN1 <= Oldest |
| | | * CSN2 |
| | | * CSN3 |
| | | * CSN4 |
| | | * CSN5 <= Newest |
| | | * </pre> |
| | | * |
| | | * Then: |
| | | * |
| | | * <pre> |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0); |
| | | * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5); |
| | | * </pre> |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param serverId |
| | | * the serverId on which to act |
| | | * @param from |
| | | * The older CSN where to start the count |
| | | * @param to |
| | | * The newer CSN where to end the count |
| | | * @return The computed number of changes |
| | | */ |
| | | long getCount(DN baseDN, int serverId, CSN from, CSN to); |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the replicaDBs for the specified |
| | | * replication domain, with all cursors starting after the provided CSN. |
| | | * <p> |
| | |
| | | StaticUtils.recursiveDelete(dbDirectory); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getCount(DN baseDN, int serverId, CSN from, CSN to) |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.getCount(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | |
| | | db.setCounterRecordWindowSize(size); |
| | | } |
| | | |
| | | /** |
| | | * Return the number of changes between 2 provided CSNs. |
| | | * This a alternative to traverseAndCount, expected to be much more efficient |
| | | * when there is a huge number of changes in the Db. |
| | | * @param from The lower (older) CSN. |
| | | * @param to The upper (newer) CSN. |
| | | * @return The computed number of changes. |
| | | */ |
| | | public long getCount(CSN from, CSN to) |
| | | { |
| | | // Now that we always keep the last CSN in the DB to avoid expiring cookies |
| | | // too quickly, we need to check if the "to" is older than the trim date. |
| | | if (to == null || !to.isOlderThan(new CSN(latestTrimDate, 0, 0))) |
| | | { |
| | | flush(); |
| | | return db.count(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | public class ReplicationDB |
| | | { |
| | | private static final int START = 0; |
| | | private static final int STOP = 1; |
| | | |
| | | private Database db; |
| | | private ReplicationDbEnv dbenv; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Count the number of changes between 2 changes numbers (inclusive). |
| | | * @param start The lower limit of the count. |
| | | * @param stop The higher limit of the count. |
| | | * @return The number of changes between provided start and stop CSN. |
| | | * Returns 0 when an error occurs. |
| | | */ |
| | | public long count(CSN start, CSN stop) |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | // If the DB has been closed then return immediately. |
| | | if (isDBClosed()) |
| | | { |
| | | return 0; |
| | | } |
| | | if (start == null && stop == null) |
| | | { |
| | | return db.count(); |
| | | } |
| | | |
| | | int[] counterValues = new int[2]; |
| | | int[] distanceToCounterRecords = new int[2]; |
| | | |
| | | // Step 1 : from the start point, traverse db to the next counter record |
| | | // or to the stop point. |
| | | findFirstCounterRecordAfterStartPoint(start, stop, counterValues, |
| | | distanceToCounterRecords); |
| | | |
| | | // cases |
| | | if (counterValues[START] == 0) |
| | | return distanceToCounterRecords[START]; |
| | | |
| | | // Step 2 : from the stop point, traverse db to the next counter record |
| | | // or to the start point. |
| | | if (!findFirstCounterRecordBeforeStopPoint(start, stop, counterValues, |
| | | distanceToCounterRecords)) |
| | | { |
| | | // database is empty |
| | | return 0; |
| | | } |
| | | |
| | | // Step 3 : Now consolidates the result |
| | | return computeDistance(counterValues, distanceToCounterRecords); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | dbenv.shutdownOnException(e); |
| | | } |
| | | finally |
| | | { |
| | | dbCloseLock.readLock().unlock(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | private void findFirstCounterRecordAfterStartPoint(CSN start, CSN stop, |
| | | int[] counterValues, int[] distanceToCounterRecords) |
| | | throws DatabaseException |
| | | { |
| | | Cursor cursor = db.openCursor(null, null); |
| | | try |
| | | { |
| | | OperationStatus status; |
| | | DatabaseEntry key; |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (start != null) |
| | | { |
| | | key = createReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | // JNR: I suspect this is equivalent to writing cursor.getFirst(). |
| | | // If it is, then please change the code to make it clearer. |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[START] = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | |
| | | // it is a regular change record |
| | | if (csn.isNewerThan(stop)) |
| | | { // we are outside the range: we reached the 'stop' target |
| | | break; |
| | | } |
| | | |
| | | distanceToCounterRecords[START]++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | // loop to update the distance and possibly find a counter record |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private boolean findFirstCounterRecordBeforeStopPoint(CSN start, CSN stop, |
| | | int[] counterValues, int[] distanceToCounterRecords) |
| | | throws DatabaseException |
| | | { |
| | | Cursor cursor = db.openCursor(null, null); |
| | | try |
| | | { |
| | | DatabaseEntry key = createReplicationKey(stop); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[STOP] = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | |
| | | // it is a regular change record |
| | | if (csn.isOlderThan(start)) |
| | | { // we are outside the range: we reached the 'start' target |
| | | break; |
| | | } |
| | | |
| | | distanceToCounterRecords[STOP]++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | // loop to update the distance and possibly find a counter record |
| | | } |
| | | return true; |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The diagram below shows a visual description of how the distance between |
| | | * two CSNs in the database is computed. |
| | | * |
| | | * <pre> |
| | | * +--------+ +--------+ |
| | | * | CASE 1 | | CASE 2 | |
| | | * +--------+ +--------+ |
| | | * |
| | | * CSN CSN |
| | | * ----- ----- |
| | | * START => ----- START => ----- |
| | | * ^ ----- ^ ----- |
| | | * | ----- | ----- |
| | | * dist 1 ----- dist 1 ----- |
| | | * | ----- | ----- |
| | | * v ----- v ----- |
| | | * CR 1&2 => [1000] CR 1 => [1000] |
| | | * ^ ----- ----- |
| | | * | ----- ----- |
| | | * dist 2 ----- ----- |
| | | * | ----- ----- |
| | | * v ----- ----- |
| | | * STOP => ----- ----- |
| | | * ----- ----- |
| | | * CR => [2000] CR 2 => [2000] |
| | | * ----- ^ ----- |
| | | * | ----- |
| | | * dist 2 ----- |
| | | * | ----- |
| | | * v ----- |
| | | * STOP => ----- |
| | | * </pre> |
| | | * |
| | | * Explanation of the terms used: |
| | | * <dl> |
| | | * <dt>START</dt> |
| | | * <dd>Start CSN for the count</dd> |
| | | * <dt>STOP</dt> |
| | | * <dd>Stop CSN for the count</dd> |
| | | * <dt>dist</dt> |
| | | * <dd>Distance from START (or STOP) to the counter record</dd> |
| | | * <dt>CSN</dt> |
| | | * <dd>Stands for "Change Sequence Number". Below it, the database is |
| | | * symbolized, where each record is represented by using dashes "-----". The |
| | | * database is ordered.</dd> |
| | | * <dt>CR</dt> |
| | | * <dd>Stands for "Counter Record". Counter Records are inserted in the |
| | | * database along with real CSNs, but they are not real changes. They are only |
| | | * used to speed up calculating the distance between 2 CSNs without the need |
| | | * to scan the whole database in between.</dd> |
| | | * </dl> |
| | | */ |
| | | private long computeDistance(int[] counterValues, |
| | | int[] distanceToCounterRecords) |
| | | { |
| | | if (counterValues[START] != 0) |
| | | { |
| | | if (counterValues[START] == counterValues[STOP]) |
| | | { |
| | | // only one counter record between from and to - no need to use it |
| | | return distanceToCounterRecords[START] + distanceToCounterRecords[STOP]; |
| | | } |
| | | // at least 2 counter records between from and to |
| | | return distanceToCounterRecords[START] |
| | | + (counterValues[STOP] - counterValues[START]) |
| | | + distanceToCounterRecords[STOP]; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Whether a provided CSN represents a counter record. A counter record is |
| | | * used to store the time. |
| | | * |
| | |
| | | import org.opends.server.tools.LDAPWriter; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.LDIFWriter; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | |
| | | ECLSimultaneousPsearches(); |
| | | } |
| | | |
| | | @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) |
| | | public void ECLReplicationServerFullTest10() throws Exception |
| | | { |
| | | // Test eligible count method. |
| | | ECLGetEligibleCountTest(); |
| | | } |
| | | |
| | | // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned |
| | | // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned |
| | | // TODO:ECL Test invalid DN in cookie returns UNWILLING + message |
| | |
| | | return null; |
| | | } |
| | | |
| | | private void ECLGetEligibleCountTest() throws Exception |
| | | { |
| | | String tn = "ECLGetEligibleCountTest"; |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | String user1entryUUID = "11111111-1112-1113-1114-111111111115"; |
| | | |
| | | final CSN[] csns = generateCSNs(4, SERVER_ID_1); |
| | | final CSN csn1 = csns[0]; |
| | | final CSN csn2 = csns[1]; |
| | | final CSN csn3 = csns[2]; |
| | | |
| | | getCNIndexDB().setPurgeDelay(0); |
| | | ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN); |
| | | // this empty state will force to count from the start of the DB |
| | | final ServerState fromStart = new ServerState(); |
| | | |
| | | // The replication changelog is empty |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, csns[0]), 0); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 1000, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | // Publish one first message |
| | | DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | Thread.sleep(300); |
| | | |
| | | // From begin to now : 1 change |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, now()), 1); |
| | | |
| | | // Publish one second message |
| | | delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn2, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | |
| | | // From begin to now : 2 changes |
| | | searchOnChangelog("(changenumber>=1)", 2, tn, SUCCESS); |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, now()), 2); |
| | | |
| | | // From begin to first change (inclusive) : 1 change = csn1 |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, csn1), 1); |
| | | |
| | | final ServerState fromStateBeforeCSN1 = new ServerState(); |
| | | fromStateBeforeCSN1.update(csn1); |
| | | |
| | | // From state/csn1(exclusive) to csn1 (inclusive) : 0 change |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN1, csn1), 0); |
| | | |
| | | // From state/csn1(exclusive) to csn2 (inclusive) : 1 change = csn2 |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN1, csn2), 1); |
| | | |
| | | final ServerState fromStateBeforeCSN2 = new ServerState(); |
| | | fromStateBeforeCSN2.update(csn2); |
| | | |
| | | // From state/csn2(exclusive) to now (inclusive) : 0 change |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN2, now()), 0); |
| | | |
| | | // Publish one third message |
| | | delMsg = newDeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, csn3, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | Thread.sleep(300); |
| | | |
| | | fromStateBeforeCSN2.update(csn2); |
| | | |
| | | // From state/csn2(exclusive) to now : 1 change = csn3 |
| | | assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN2, now()), 1); |
| | | |
| | | boolean perfs=false; |
| | | if (perfs) |
| | | { |
| | | // number of msgs used by the test |
| | | final int maxMsg = 999999; |
| | | |
| | | // We need an RS configured with a window size bigger than the number |
| | | // of msg used by the test. |
| | | assertTrue(maxMsg<maxWindow); |
| | | debugInfo(tn, "Perf test in compat mode - will generate " + maxMsg + " msgs."); |
| | | for (int i=4; i<=maxMsg; i++) |
| | | { |
| | | CSN csnx = new CSN(TimeThread.getTime(), i, SERVER_ID_1); |
| | | delMsg = newDeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, csnx, user1entryUUID); |
| | | server01.publish(delMsg); |
| | | } |
| | | Thread.sleep(1000); |
| | | debugInfo(tn, "Perfs test in compat - search lastChangeNumber"); |
| | | Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | long t1 = TimeThread.getTime(); |
| | | long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | replicationServer.getEligibleCSN(excludedDomains), excludedDomains); |
| | | assertEquals(limits[1], maxMsg); |
| | | long t2 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - " + maxMsg + " counted in (ms):" + (t2 - t1)); |
| | | |
| | | String filter = "(changenumber>=" + maxMsg + ")"; |
| | | searchOnChangelog(filter, 1, tn, SUCCESS); |
| | | long t3 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - last change searched in (ms):" + (t3 - t2)); |
| | | |
| | | filter = "(changenumber>=" + maxMsg + ")"; |
| | | searchOnChangelog(filter, 1, tn, SUCCESS); |
| | | long t4 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - last change searched in (ms):" + (t4 - t3)); |
| | | |
| | | filter = "(changenumber>=" + (maxMsg - 2) + ")"; |
| | | searchOnChangelog(filter, 3, tn, SUCCESS); |
| | | long t5 = TimeThread.getTime(); |
| | | debugInfo(tn, "Perfs - last 3 changes searched in (ms):" + (t5 - t4)); |
| | | } |
| | | stop(server01); |
| | | debugInfo(tn, "Ending test with success"); |
| | | } |
| | | |
| | | private CSN now() |
| | | { |
| | | return new CSN(TimeThread.getTime(), 1, SERVER_ID_1); |
| | | } |
| | | |
| | | /** |
| | | * Test ECl entry attributes, and there configuration. |
| | | */ |
| | |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetCountNoCounterRecords() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5); |
| | | for (CSN csn : csns) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | replicaDB.flush(); |
| | | |
| | | assertEquals(replicaDB.getCount(csns[0], csns[0]), 1); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[1]), 2); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[4]), 5); |
| | | assertEquals(replicaDB.getCount(null, csns[4]), 5); |
| | | assertEquals(replicaDB.getCount(csns[0], null), 0); |
| | | assertEquals(replicaDB.getCount(null, null), 5); |
| | | } |
| | | finally |
| | | { |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test the logic that manages counter records in the JEReplicaDB in order to |
| | | * optimize the counting of record in the replication changelog db. |
| | | * optimize the oldest and newest records in the replication changelog db. |
| | | */ |
| | | @Test(enabled=true, groups = { "opendj-256" }) |
| | | void testDbCounts() throws Exception |
| | | void testGetOldestNewestCSNs() throws Exception |
| | | { |
| | | // FIXME: for some reason this test is always failing in Jenkins when run as |
| | | // part of the unit tests. Here is the output (the failure is 100% |
| | | // reproducible and always has the same value of 3004): |
| | | // |
| | | // Failed Test: |
| | | // org.opends.server.replication.server.JEReplicaDBTest#testDbCounts |
| | | // [testng] Failure Cause: java.lang.AssertionError: AFTER PURGE |
| | | // expected:<8000> but was:<3004> |
| | | // [testng] org.testng.Assert.fail(Assert.java:84) |
| | | // [testng] org.testng.Assert.failNotEquals(Assert.java:438) |
| | | // [testng] org.testng.Assert.assertEquals(Assert.java:108) |
| | | // [testng] org.testng.Assert.assertEquals(Assert.java:323) |
| | | // [testng] |
| | | // org.opends.server.replication.server.JEReplicaDBTest.testDBCount(JEReplicaDBTest.java:594) |
| | | // [testng] |
| | | // org.opends.server.replication.server.JEReplicaDBTest.testDbCounts(JEReplicaDBTest.java:389) |
| | | |
| | | // It's worth testing with 2 different setting for counterRecord |
| | | // - a counter record is put every 10 Update msg in the db - just a unit |
| | | // setting. |
| | |
| | | // - when start and stop are after the first counter record, |
| | | // - when start and stop are before and after more than one counter record, |
| | | // After a purge. |
| | | // After shutdowning/closing and reopening the db. |
| | | testDBCount(40, 10); |
| | | // FIXME next line is the one failing with the stacktrace above |
| | | testDBCount(4000, 1000); |
| | | // After shutting down/closing and reopening the db. |
| | | testGetOldestNewestCSNs(40, 10); |
| | | testGetOldestNewestCSNs(4000, 1000); |
| | | } |
| | | |
| | | private void testDBCount(int max, int counterWindow) throws Exception |
| | | private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception |
| | | { |
| | | String tn = "testDBCount("+max+","+counterWindow+")"; |
| | | debugInfo(tn, "Starting test"); |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | // Test count in different subcases trying to handle all special cases |
| | | // regarding the 'counter' record and 'count' algorithm |
| | | assertCount(tn, replicaDB, csns[1], csns[1], 1, "FROM change1 TO change1 "); |
| | | assertCount(tn, replicaDB, csns[1], csns[2], 2, "FROM change1 TO change2 "); |
| | | assertCount(tn, replicaDB, csns[1], csns[counterWindow], counterWindow, |
| | | "FROM change1 TO counterWindow=" + counterWindow); |
| | | |
| | | final int j = counterWindow + 1; |
| | | assertCount(tn, replicaDB, csns[1], csns[j], j, |
| | | "FROM change1 TO counterWindow+1=" + j); |
| | | final int k = 2 * counterWindow; |
| | | assertCount(tn, replicaDB, csns[1], csns[k], k, |
| | | "FROM change1 TO 2*counterWindow=" + k); |
| | | final int l = k + 1; |
| | | assertCount(tn, replicaDB, csns[1], csns[l], l, |
| | | "FROM change1 TO 2*counterWindow+1=" + l); |
| | | assertCount(tn, replicaDB, csns[2], csns[5], 4, |
| | | "FROM change2 TO change5 "); |
| | | assertCount(tn, replicaDB, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4, |
| | | "FROM counterWindow+2 TO counterWindow+5 "); |
| | | assertCount(tn, replicaDB, csns[2], csns[(counterWindow + 5)], counterWindow + 4, |
| | | "FROM change2 TO counterWindow+5 "); |
| | | assertCount(tn, replicaDB, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1, |
| | | "FROM counterWindow+4 TO counterWindow+4 "); |
| | | |
| | | CSN olderThanOldest = null; |
| | | CSN newerThanNewest = new CSN(System.currentTimeMillis() + (2*(max+1)), 100, 1); |
| | | |
| | | // Now we want to test with start and stop outside of the db |
| | | |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | assertCount(tn, replicaDB, olderThanOldest, newerThanNewest, max, |
| | | "FROM null (start of time) TO now (> newest change in the db)"); |
| | | |
| | | // Now we want to test that after closing and reopening the db, the |
| | | // counting algo is well reinitialized and when new messages are added |
| | | // the new counter are correctly generated. |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | |
| | | // Populate the db with 'max' msg |
| | | for (int i=max+1; i<=(2*max); i++) |
| | | { |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, 2 * max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | |
| | | // |
| | | |
| | | replicaDB.setPurgeDelay(100); |
| | | sleep(4000); |
| | | long totalCount = replicaDB.getCount(null, null); |
| | | debugInfo(tn, "FROM our first generated change TO now (> newest change in the db)" + " After purge, total count=" + totalCount); |
| | | sleep(1000); |
| | | |
| | | String testcase = "AFTER PURGE (oldest, newest)="; |
| | | debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN()); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest="); |
| | | |
| | | int expectedCnt; |
| | | if (totalCount>1) |
| | | { |
| | | final int newestSeqnum = replicaDB.getNewestCSN().getSeqnum(); |
| | | final int oldestSeqnum = replicaDB.getOldestCSN().getSeqnum(); |
| | | expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1; |
| | | } |
| | | else |
| | | { |
| | | expectedCnt = 0; |
| | | } |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE"); |
| | | |
| | | // Clear ... |
| | | debugInfo(tn,"clear:"); |
| | | replicaDB.clear(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertCount(String tn, JEReplicaDB replicaDB, CSN from, CSN to, |
| | | int expectedCount, String testcase) |
| | | { |
| | | long actualCount = replicaDB.getCount(from, to); |
| | | debugInfo(tn, testcase + " actualCount=" + actualCount); |
| | | assertEquals(actualCount, expectedCount, testcase); |
| | | } |
| | | |
| | | } |