opends/src/messages/messages/replication.properties
@@ -492,7 +492,7 @@ SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \ failed to parse change record with changenumber %s from the database. Error: %s SEVERE_ERR_SESSION_STARTUP_INTERRUPTED_216=%s was interrupted in the startup phase MILD_ERR_READING_FIRST_THEN_LAST_IN_CHANGENUMBER_DATABASE_217=An error occurred \ when accessing the change number database: impossible to read the last record \ after having successfully read the first. Database might have been cleaned or \ MILD_ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE_217=An error occurred \ when accessing the change number database: impossible to read the newest record \ after having successfully read the oldest. Database might have been cleaned or \ closed between successive reads opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -94,7 +94,7 @@ @Override() public void finalizeVirtualAttributeProvider() { // // nothing to finalize } @@ -126,7 +126,7 @@ @Override() public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) { String first = "0"; String value = "0"; try { ECLWorkflowElement eclwe = (ECLWorkflowElement) @@ -138,26 +138,22 @@ MultimasterReplication.getECLDisabledDomains(); excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); ReplicationServer rs = eclwe.getReplicationServer(); long[] limits = rs.getECLChangeNumberLimits( final ReplicationServer rs = eclwe.getReplicationServer(); final long[] limits = rs.getECLChangeNumberLimits( rs.getEligibleCSN(excludedDomains), excludedDomains); first = String.valueOf(limits[0]); value = String.valueOf(limits[0]); } } catch(Exception e) { // We got an error computing the first change number. // We got an error computing this change number. // Rather than returning 0 which is no change, return -1 to // indicate the error. first = "-1"; value = "-1"; TRACER.debugCaught(DebugLogLevel.ERROR, e); } AttributeValue value = AttributeValues.create( ByteString.valueOf(first), ByteString.valueOf(first)); return Collections.singleton(value); ByteString valueBS = ByteString.valueOf(value); return Collections.singleton(AttributeValues.create(valueBS, valueBS)); } @@ -170,7 +166,7 @@ SearchOperation searchOperation, boolean isPreIndexed) { // We do not allow search for the firstChangeNumber. It's a read-only // We do not allow search for this change number. It's a read-only // attribute of the RootDSE. return false; } opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -94,7 +94,7 @@ @Override() public void finalizeVirtualAttributeProvider() { // // nothing to finalize } @@ -126,7 +126,7 @@ @Override() public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) { String last = "0"; String value = "0"; try { ECLWorkflowElement eclwe = (ECLWorkflowElement) @@ -138,26 +138,22 @@ MultimasterReplication.getECLDisabledDomains(); excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); ReplicationServer rs = eclwe.getReplicationServer(); long[] limits = rs.getECLChangeNumberLimits( final ReplicationServer rs = eclwe.getReplicationServer(); final long[] limits = rs.getECLChangeNumberLimits( rs.getEligibleCSN(excludedDomains), excludedDomains); last = String.valueOf(limits[1]); value = String.valueOf(limits[1]); } } catch(Exception e) { // We got an error computing the first change number. // We got an error computing this change number. // Rather than returning 0 which is no change, return -1 to // indicate the error. last = "-1"; value = "-1"; TRACER.debugCaught(DebugLogLevel.ERROR, e); } AttributeValue value = AttributeValues.create( ByteString.valueOf(last), ByteString.valueOf(last)); return Collections.singleton(value); ByteString valueBS = ByteString.valueOf(value); return Collections.singleton(AttributeValues.create(valueBS, valueBS)); } @@ -170,7 +166,7 @@ SearchOperation searchOperation, boolean isPreIndexed) { // We do not allow search for the lastChangeNumber. It's a read-only // We do not allow search for this change number. It's a read-only // attribute of the RootDSE. return false; } opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
@@ -27,10 +27,7 @@ */ package org.opends.server.replication.common; import static org.opends.messages.ExtensionMessages.*; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -41,12 +38,16 @@ import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.*; import org.opends.server.util.ServerConstants; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import static org.opends.messages.ExtensionMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; /** * This class implements a virtual attribute provider in the root-dse entry * that contains the last (newest) cookie (cross domain state) @@ -58,6 +59,11 @@ { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * Creates a new instance of this member virtual attribute provider. */ public LastCookieVirtualProvider() @@ -89,7 +95,7 @@ @Override() public void finalizeVirtualAttributeProvider() { // // nothing to finalize } /** @@ -120,7 +126,6 @@ @Override() public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) { Set<AttributeValue> values = new HashSet<AttributeValue>(); try { ECLWorkflowElement eclwe = (ECLWorkflowElement) @@ -132,22 +137,17 @@ MultimasterReplication.getECLDisabledDomains(); excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); ReplicationServer rs = eclwe.getReplicationServer(); MultiDomainServerState lastCookie = rs.getLastECLCookie(excludedDomains); AttributeValue value = AttributeValues.create( ByteString.valueOf(lastCookie.toString()), ByteString.valueOf(lastCookie.toString())); values=Collections.singleton(value); final ReplicationServer rs = eclwe.getReplicationServer(); String newestCookie = rs.getNewestECLCookie(excludedDomains).toString(); final ByteString cookie = ByteString.valueOf(newestCookie); return Collections.singleton(AttributeValues.create(cookie, cookie)); } return values; } catch(Exception e) catch (Exception e) { return values; TRACER.debugCaught(DebugLogLevel.ERROR, e); } return Collections.emptySet(); } /** opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -277,10 +277,10 @@ } else if (newMsg.getCSN().getTime() >= domainLatestTrimDate) { // when the replication changelog is trimmed, the last (latest) chg // is left in the db (whatever its age), and we don't want this chg // when the replication changelog is trimmed, the newest change // is left in the DB (whatever its age), and we don't want this change // to be returned in the external changelog. // So let's check if the chg time is older than the trim date // So let's check if the change time is older than the trim date return newMsg; } } @@ -528,10 +528,10 @@ } /** * Initialize the handler from a provided first change number. * Initialize the handler from a provided start change number. * * @param startChangeNumber * The provided first change number. * The provided start change number. * @throws DirectoryException * When an error is raised. */ @@ -582,29 +582,28 @@ if (startChangeNumber <= 1) { // Request filter DOES NOT contain any first change number // So we'll generate from the first change number in the CNIndexDB final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord(); if (firstCNRecord == null) // Request filter DOES NOT contain any start change number // So we'll generate from the oldest change number in the CNIndexDB final CNIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); if (oldestRecord == null) { // DB is empty or closed isEndOfCNIndexDBReached = true; return null; } final long firstChangeNumber = firstCNRecord.getChangeNumber(); final String crossDomainStartState = firstCNRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); final String crossDomainStartState = oldestRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestRecord.getChangeNumber()); return crossDomainStartState; } // Request filter DOES contain a startChangeNumber // Read the CNIndexDB to see whether it contains startChangeNumber CNIndexRecord startCNRecord = cnIndexDB.getRecord(startChangeNumber); if (startCNRecord != null) CNIndexRecord startRecord = cnIndexDB.getRecord(startChangeNumber); if (startRecord != null) { // found the provided startChangeNumber, let's return it final String crossDomainStartState = startCNRecord.getPreviousCookie(); final String crossDomainStartState = startRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber); return crossDomainStartState; } @@ -613,50 +612,49 @@ /* * Get the changeNumberLimits (from the eligibleCSN obtained at the start of * this method) in order to have the first and last change numbers. * this method) in order to have the oldest and newest change numbers. */ final long[] limits = replicationServer.getECLChangeNumberLimits( eligibleCSN, excludedBaseDNs); final long firstChangeNumber = limits[0]; final long lastChangeNumber = limits[1]; final long oldestChangeNumber = limits[0]; final long newestChangeNumber = limits[1]; // If the startChangeNumber provided is lower than the firstChangeNumber in // If the startChangeNumber provided is lower than the oldestChangeNumber in // the DB, let's use the lower limit. if (startChangeNumber < firstChangeNumber) if (startChangeNumber < oldestChangeNumber) { CNIndexRecord firstCNRecord = cnIndexDB.getRecord(firstChangeNumber); if (firstCNRecord == null) CNIndexRecord oldestRecord = cnIndexDB.getRecord(oldestChangeNumber); if (oldestRecord == null) { // This should not happen isEndOfCNIndexDBReached = true; return null; } final String crossDomainStartState = firstCNRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); final String crossDomainStartState = oldestRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestChangeNumber); return crossDomainStartState; } else if (startChangeNumber <= lastChangeNumber) else if (startChangeNumber <= newestChangeNumber) { // startChangeNumber is between first and potential last and has never // startChangeNumber is between oldest and potential newest and has never // been returned yet final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); if (lastCNRecord == null) final CNIndexRecord newestRecord = cnIndexDB.getNewestRecord(); if (newestRecord == null) { isEndOfCNIndexDBReached = true; return null; } final long lastKey = lastCNRecord.getChangeNumber(); final String crossDomainStartState = lastCNRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); final String crossDomainStartState = newestRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(newestRecord.getChangeNumber()); return crossDomainStartState; // TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ... // this may be very long. Work on perf improvement here. } // startChangeNumber is greater than the potential lastChangeNumber // startChangeNumber is greater than the potential newest change number throw new DirectoryException(ResultCode.SUCCESS, Message.raw("")); } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1363,18 +1363,18 @@ } /** * Get first and last change number. * Get oldest and newest change numbers. * * @param crossDomainEligibleCSN * @param maxOldestChangeNumber * The provided crossDomainEligibleCSN used as the upper limit for * the last change number * the oldest change number * @param excludedBaseDNs * The baseDNs that are excluded from the ECL. * @return The first and last change numbers. * @return The oldest and newest change numbers. * @throws DirectoryException * When it happens. */ public long[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN, public long[] getECLChangeNumberLimits(CSN maxOldestChangeNumber, Set<String> excludedBaseDNs) throws DirectoryException { /* The content of the CNIndexDB depends on the SEARCH operations done before @@ -1382,103 +1382,105 @@ * The limits we want to get are the "potential" limits if a request was * done, the CNIndexDB is probably not complete to do that. * * The first change number is : * - the first record from the CNIndexDB * The oldest change number is : * - the oldest record from the CNIndexDB * - if none because CNIndexDB empty, * then * if no change in replchangelog then return 0 * else return 1 (change number that WILL be returned to next search) * * The last change number is : * - initialized with the last record from the CNIndexDB (0 if none) * The newest change number is : * - initialized with the newest record from the CNIndexDB (0 if none) * and consider the genState associated * - to the last change number, we add the count of updates in the * - to the newest change number, we add the count of updates in the * replchangelog FROM that genState TO the crossDomainEligibleCSN * (this diff is done domain by domain) */ try { boolean dbEmpty = true; long firstChangeNumber = 0; long lastChangeNumber = 0; long oldestChangeNumber = 0; long newestChangeNumber = 0; final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord(); final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); final CNIndexRecord oldestCNRecord = cnIndexDB.getOldestRecord(); final CNIndexRecord newestCNRecord = cnIndexDB.getNewestRecord(); boolean noCookieForLastCN = true; CSN csnForLastCN = null; DN domainForLastCN = null; if (firstCNRecord != null) boolean noCookieForNewestCN = true; CSN csnForNewestCN = null; DN baseDNForNewestCN = null; if (oldestCNRecord != null) { if (lastCNRecord == null) if (newestCNRecord == null) { // Edge case: DB was cleaned or closed in between call to getFirst*() // and getLast*(). The only remaining solution is to fail fast. // Edge case: DB was cleaned or closed in between calls to // getOldest*() and getNewest*(). // The only remaining solution is to fail fast. throw new ChangelogException( ERR_READING_FIRST_THEN_LAST_IN_CHANGENUMBER_DATABASE.get()); ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get()); } dbEmpty = false; firstChangeNumber = firstCNRecord.getChangeNumber(); lastChangeNumber = lastCNRecord.getChangeNumber(); oldestChangeNumber = oldestCNRecord.getChangeNumber(); newestChangeNumber = newestCNRecord.getChangeNumber(); // Get the generalized state associated with the current last change // Get the generalized state associated with the current newest change // number and initializes from it the startStates table String lastCNGenState = lastCNRecord.getPreviousCookie(); noCookieForLastCN = lastCNGenState == null || lastCNGenState.length() == 0; String newestCNGenState = newestCNRecord.getPreviousCookie(); noCookieForNewestCN = newestCNGenState == null || newestCNGenState.length() == 0; csnForLastCN = lastCNRecord.getCSN(); domainForLastCN = lastCNRecord.getBaseDN(); csnForNewestCN = newestCNRecord.getCSN(); baseDNForNewestCN = newestCNRecord.getBaseDN(); } long newestDate = 0; for (ReplicationServerDomain rsd : getReplicationServerDomains()) for (ReplicationServerDomain rsDomain : getReplicationServerDomains()) { if (contains(excludedBaseDNs, rsd.getBaseDN().toNormalizedString())) if (contains( excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString())) continue; // for this domain, have the state in the replchangelog // where the last change number update is // where the newest change number update is long ec; if (noCookieForLastCN) if (noCookieForNewestCN) { // Count changes of this domain from the beginning of the changelog CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0); ec = rsd.getEligibleCount( rsd.getStartState().duplicateOnlyOlderThan(trimCSN), crossDomainEligibleCSN); CSN trimCSN = new CSN(rsDomain.getLatestDomainTrimDate(), 0, 0); ec = rsDomain.getEligibleCount( rsDomain.getStartState().duplicateOnlyOlderThan(trimCSN), maxOldestChangeNumber); } else { // There are records in the CNIndexDB (so already returned to clients) // BUT // There is nothing related to this domain in the last CNIndexRecord // There is nothing related to this domain in the newest CNIndexRecord // (may be this domain was disabled when this record was returned). // In that case, are counted the changes from // the date of the most recent change from this last CNIndexRecord // the date of the most recent change from this newest CNIndexRecord if (newestDate == 0) { newestDate = csnForLastCN.getTime(); newestDate = csnForNewestCN.getTime(); } // And count changes of this domain from the date of the // lastseqnum record (that does not refer to this domain) CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0); ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN); // newest seqnum record (that does not refer to this domain) CSN csnx = new CSN(newestDate, csnForNewestCN.getSeqnum(), 0); ec = rsDomain.getEligibleCount(csnx, maxOldestChangeNumber); if (domainForLastCN.equals(rsd.getBaseDN())) if (baseDNForNewestCN.equals(rsDomain.getBaseDN())) ec--; } // cumulates on domains lastChangeNumber += ec; newestChangeNumber += ec; // CNIndexDB is empty and there are eligible updates in the replication // changelog then init first change number if (ec > 0 && firstChangeNumber == 0) firstChangeNumber = 1; // changelog then init oldest change number if (ec > 0 && oldestChangeNumber == 0) oldestChangeNumber = 1; } if (dbEmpty) @@ -1486,10 +1488,10 @@ // The database was empty, just keep increasing numbers since last time // we generated one change number. long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber(); firstChangeNumber += lastGeneratedCN; lastChangeNumber += lastGeneratedCN; oldestChangeNumber += lastGeneratedCN; newestChangeNumber += lastGeneratedCN; } return new long[] { firstChangeNumber, lastChangeNumber }; return new long[] { oldestChangeNumber, newestChangeNumber }; } catch (ChangelogException e) { @@ -1498,11 +1500,13 @@ } /** * Returns the last (newest) cookie value. * @param excludedBaseDNs The list of baseDNs excluded from ECL. * @return the last cookie value. * Returns the newest cookie value. * * @param excludedBaseDNs * The list of baseDNs excluded from ECL. * @return the newest cookie value. */ public MultiDomainServerState getLastECLCookie(Set<String> excludedBaseDNs) public MultiDomainServerState getNewestECLCookie(Set<String> excludedBaseDNs) { // Initialize start state for all running domains with empty state MultiDomainServerState result = new MultiDomainServerState(); opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -46,7 +46,7 @@ /** * Returns the last generated change number. * * @return the lastGeneratedChangeNumber * @return the last generated change number */ long getLastGeneratedChangeNumber(); @@ -62,31 +62,30 @@ CNIndexRecord getRecord(long changeNumber) throws ChangelogException; /** * Get the first record stored in this DB. * Get the oldest record stored in this DB. * * @return Returns the first {@link CNIndexRecord} in this DB, null when the * @return Returns the oldest {@link CNIndexRecord} in this DB, null when the * DB is empty or closed * @throws ChangelogException * if a database problem occurs. */ CNIndexRecord getFirstRecord() throws ChangelogException; CNIndexRecord getOldestRecord() throws ChangelogException; /** * Get the last record stored in this DB. * Get the newest record stored in this DB. * * @return Returns the last {@link CNIndexRecord} in this DB, null when the DB * is empty or closed * @return Returns the newest {@link CNIndexRecord} in this DB, null when the * DB is empty or closed * @throws ChangelogException * if a database problem occurs. */ CNIndexRecord getLastRecord() throws ChangelogException; CNIndexRecord getNewestRecord() throws ChangelogException; /** * Add an update to the list of messages that must be saved to this DB managed * by this DB. * by this DB and return the changeNumber associated to this record. * <p> * This method is blocking if the size of the list of message is larger than * its maximum. * Note: this method disregards the changeNumber in the provided record. * <p> * FIXME will be removed when ECLServerHandler will not be responsible anymore * for lazily building the ChangeNumberIndexDB. opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -74,15 +74,15 @@ private DraftCNDB db; /** * FIXME Is this field that useful? {@link #getFirstChangeNumber()} does not * FIXME Is this field that useful? {@link #getOldestChangeNumber()} does not * even use it! */ private long firstChangeNumber = NO_KEY; private volatile long oldestChangeNumber = NO_KEY; /** * FIXME Is this field that useful? {@link #getLastChangeNumber()} does not * FIXME Is this field that useful? {@link #getNewestChangeNumber()} does not * even use it! */ private long lastChangeNumber = NO_KEY; private volatile long newestChangeNumber = NO_KEY; /** The last generated value for the change number. */ private final AtomicLong lastGeneratedChangeNumber; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); @@ -102,7 +102,7 @@ * FIXME it never gets updated even when the replication server purge delay is * updated */ private long trimAge; private volatile long trimAge; private ReplicationServer replicationServer; @@ -123,14 +123,14 @@ // DB initialization db = new DraftCNDB(dbenv); final CNIndexRecord firstRecord = db.readFirstRecord(); final CNIndexRecord lastRecord = db.readLastRecord(); firstChangeNumber = getChangeNumber(firstRecord); lastChangeNumber = getChangeNumber(lastRecord); final CNIndexRecord oldestRecord = db.readFirstRecord(); final CNIndexRecord newestRecord = db.readLastRecord(); oldestChangeNumber = getChangeNumber(oldestRecord); newestChangeNumber = getChangeNumber(newestRecord); // initialization of the lastGeneratedChangeNumber from the DB content // if DB is empty => last record does not exist => default to 0 lastGeneratedChangeNumber = new AtomicLong((lastRecord != null) ? lastRecord.getChangeNumber() : 0); long newestCN = (newestRecord != null) ? newestRecord.getChangeNumber() : 0; lastGeneratedChangeNumber = new AtomicLong(newestCN); // Trimming thread thread = @@ -168,14 +168,14 @@ /** {@inheritDoc} */ @Override public CNIndexRecord getFirstRecord() throws ChangelogException public CNIndexRecord getOldestRecord() throws ChangelogException { return db.readFirstRecord(); } /** {@inheritDoc} */ @Override public CNIndexRecord getLastRecord() throws ChangelogException public CNIndexRecord getNewestRecord() throws ChangelogException { return db.readLastRecord(); } @@ -211,7 +211,7 @@ */ public boolean isEmpty() throws ChangelogException { return getLastRecord() == null; return getNewestRecord() == null; } /** @@ -375,12 +375,12 @@ continue; } // Purge up to wherever the other DBs have been purged to. // FIXME there is an opportunity for a phantom record in the current // DB if the replicaDB gets purged after the next if statement. final CSN csn = record.getCSN(); final ServerState startState = domain.getStartState(); final CSN fcsn = startState.getCSN(csn.getServerId()); final long currentChangeNumber = record.getChangeNumber(); if (csn.isOlderThan(fcsn)) { cursor.delete(); @@ -402,6 +402,7 @@ catch(Exception e) { // We could not parse the MultiDomainServerState from the record // FIXME this is quite an aggressive delete() cursor.delete(); continue; } @@ -417,7 +418,7 @@ continue; } firstChangeNumber = currentChangeNumber; oldestChangeNumber = record.getChangeNumber(); cursor.close(); return; } @@ -515,8 +516,8 @@ @Override public String toString() { return "JEChangeNumberIndexDB: " + firstChangeNumber + " " + lastChangeNumber; return getClass().getSimpleName() + ": " + oldestChangeNumber + " " + newestChangeNumber; } /** @@ -537,8 +538,8 @@ public void clear() throws ChangelogException { db.clear(); firstChangeNumber = getChangeNumber(db.readFirstRecord()); lastChangeNumber = getChangeNumber(db.readLastRecord()); oldestChangeNumber = getChangeNumber(db.readFirstRecord()); newestChangeNumber = getChangeNumber(db.readLastRecord()); } private ReentrantLock lock = new ReentrantLock(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -71,24 +71,11 @@ JEChangeNumberIndexDB cnIndexDB = null; try { TestCaseUtils.startServer(); int changelogPort = TestCaseUtils.findFreePort(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100, null); replicationServer = new ReplicationServer(conf); replicationServer = newReplicationServer(); cnIndexDB = newCNIndexDB(replicationServer); cnIndexDB.setPurgeDelay(0); // Prepare data to be stored in the db int cn1 = 3; int cn2 = 4; int cn3 = 5; String value1 = "value1"; String value2 = "value2"; String value3 = "value3"; @@ -100,16 +87,16 @@ CSN[] csns = newCSNs(1, 0, 3); // Add records cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0])); cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1])); cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2])); long cn1 = cnIndexDB.addRecord(new CNIndexRecord(value1, baseDN1, csns[0])); cnIndexDB.addRecord(new CNIndexRecord(value2, baseDN2, csns[1])); long cn3 = cnIndexDB.addRecord(new CNIndexRecord(value3, baseDN3, csns[2])); // The ChangeNumber should not get purged final long firstChangeNumber = cnIndexDB.getFirstRecord().getChangeNumber(); assertEquals(firstChangeNumber, cn1); assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3); final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber(); assertEquals(oldestCN, cn1); assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3); DraftCNDBCursor dbc = cnIndexDB.getReadCursor(firstChangeNumber); DraftCNDBCursor dbc = cnIndexDB.getReadCursor(oldestCN); try { assertEqualTo(dbc.currentRecord(), csns[0], baseDN1, value1); @@ -135,8 +122,8 @@ { Thread.sleep(200); } assertNull(cnIndexDB.getFirstRecord()); assertNull(cnIndexDB.getLastRecord()); assertNull(cnIndexDB.getOldestRecord()); assertNull(cnIndexDB.getNewestRecord()); assertEquals(cnIndexDB.count(), 0); } finally @@ -191,26 +178,13 @@ JEChangeNumberIndexDB cnIndexDB = null; try { TestCaseUtils.startServer(); int changelogPort = TestCaseUtils.findFreePort(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100, null); replicationServer = new ReplicationServer(conf); replicationServer = newReplicationServer(); cnIndexDB = newCNIndexDB(replicationServer); cnIndexDB.setPurgeDelay(0); assertTrue(cnIndexDB.isEmpty()); // Prepare data to be stored in the db int cn1 = 3; int cn2 = 4; int cn3 = 5; String value1 = "value1"; String value2 = "value2"; String value3 = "value3"; @@ -222,14 +196,13 @@ CSN[] csns = newCSNs(1, 0, 3); // Add records cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0])); cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1])); cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2])); Thread.sleep(500); long cn1 = cnIndexDB.addRecord(new CNIndexRecord(value1, baseDN1, csns[0])); long cn2 = cnIndexDB.addRecord(new CNIndexRecord(value2, baseDN2, csns[1])); long cn3 = cnIndexDB.addRecord(new CNIndexRecord(value3, baseDN3, csns[2])); // Checks assertEquals(cnIndexDB.getFirstRecord().getChangeNumber(), cn1); assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3); assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1); assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3); assertEquals(cnIndexDB.count(), 3, "Db count"); assertFalse(cnIndexDB.isEmpty()); @@ -250,8 +223,8 @@ cnIndexDB.clear(); // Check the db is cleared. assertNull(cnIndexDB.getFirstRecord()); assertNull(cnIndexDB.getLastRecord()); assertNull(cnIndexDB.getOldestRecord()); assertNull(cnIndexDB.getNewestRecord()); assertEquals(cnIndexDB.count(), 0); assertTrue(cnIndexDB.isEmpty()); } @@ -263,6 +236,14 @@ } } private ReplicationServer newReplicationServer() throws Exception { TestCaseUtils.startServer(); final int port = TestCaseUtils.findFreePort(); return new ReplicationServer( new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null)) ; } private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB, long changeNumber) throws Exception { @@ -278,7 +259,7 @@ } private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor, int... sns) throws ChangelogException long... sns) throws ChangelogException { try {