opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -114,24 +114,26 @@ long getDomainChangesCount(DN baseDN); /** * Returns the FIRST {@link CSN}s of each serverId for the specified * Returns the oldest {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a {serverId => FIRST CSN} Map * @return a {serverId => oldest CSN} Map. If a replica DB is empty or closed, * the oldest CSN will be null for that replica. */ Map<Integer, CSN> getDomainFirstCSNs(DN baseDN); Map<Integer, CSN> getDomainOldestCSNs(DN baseDN); /** * Returns the LAST {@link CSN}s of each serverId for the specified * Returns the newest {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a {serverId => LAST CSN} Map * @return a {serverId => newest CSN} Map. If a replica DB is empty or closed, * the newest CSN will be null for that replica. */ Map<Integer, CSN> getDomainLastCSNs(DN baseDN); Map<Integer, CSN> getDomainNewestCSNs(DN baseDN); /** * Retrieves the latest trim date for the specified replication domain. opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -111,8 +111,8 @@ private int queueByteSize = 0; private ReplicationDB db; private CSN firstChange; private CSN lastChange; private CSN oldestCSN; private CSN newestCSN; private int serverId; private DN baseDN; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); @@ -153,8 +153,8 @@ queueLowmarkBytes = 200 * queueLowmark; queueHimarkBytes = 200 * queueLowmark; db = new ReplicationDB(id, baseDN, replicationServer, dbenv); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); oldestCSN = db.readOldestCSN(); newestCSN = db.readNewestCSN(); thread = new DirectoryThread(this, "Replication server RS(" + replicationServer.getServerId() + ") changelog checkpointer for Replica DS(" + id @@ -198,13 +198,13 @@ queueByteSize += update.size(); msgQueue.add(update); if (lastChange == null || lastChange.older(update.getCSN())) if (newestCSN == null || newestCSN.older(update.getCSN())) { lastChange = update.getCSN(); newestCSN = update.getCSN(); } if (firstChange == null) if (oldestCSN == null) { firstChange = update.getCSN(); oldestCSN = update.getCSN(); } } } @@ -225,21 +225,23 @@ } /** * Get the firstChange. * @return Returns the firstChange. * Get the oldest CSN that has not been purged yet. * * @return the oldest CSN that has not been purged yet. */ public CSN getFirstChange() public CSN getOldestCSN() { return firstChange; return oldestCSN; } /** * Get the lastChange. * @return Returns the lastChange. * Get the newest CSN that has not been purged yet. * * @return the newest CSN that has not been purged yet. */ public CSN getLastChange() public CSN getNewestCSN() { return lastChange; return newestCSN; } /** @@ -249,9 +251,9 @@ */ public long getChangesCount() { if (lastChange != null && firstChange != null) if (newestCSN != null && oldestCSN != null) { return lastChange.getSeqnum() - firstChange.getSeqnum() + 1; return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1; } return 0; } @@ -453,13 +455,13 @@ return; } if (!csn.equals(lastChange) && csn.older(trimDate)) if (!csn.equals(newestCSN) && csn.older(trimDate)) { cursor.delete(); } else { firstChange = csn; oldestCSN = csn; return; } } @@ -532,13 +534,13 @@ String.valueOf(serverId))); attributes.add(Attributes.create("domain-name", baseDN.toNormalizedString())); if (firstChange != null) if (oldestCSN != null) { attributes.add(Attributes.create("first-change", encode(firstChange))); attributes.add(Attributes.create("first-change", encode(oldestCSN))); } if (lastChange != null) if (newestCSN != null) { attributes.add(Attributes.create("last-change", encode(lastChange))); attributes.add(Attributes.create("last-change", encode(newestCSN))); } attributes.add( Attributes.create("queue-size", String.valueOf(msgQueue.size()))); @@ -581,7 +583,7 @@ @Override public String toString() { return baseDN + " " + serverId + " " + firstChange + " " + lastChange; return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN; } /** @@ -606,8 +608,8 @@ queueByteSize = 0; db.clear(); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); oldestCSN = db.readOldestCSN(); newestCSN = db.readNewestCSN(); } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -270,28 +270,28 @@ /** {@inheritDoc} */ @Override public Map<Integer, CSN> getDomainFirstCSNs(DN baseDN) public Map<Integer, CSN> getDomainOldestCSNs(DN baseDN) { final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); final Map<Integer, CSN> results = new HashMap<Integer, CSN>(domainMap.size()); for (DbHandler dbHandler : domainMap.values()) { results.put(dbHandler.getServerId(), dbHandler.getFirstChange()); results.put(dbHandler.getServerId(), dbHandler.getOldestCSN()); } return results; } /** {@inheritDoc} */ @Override public Map<Integer, CSN> getDomainLastCSNs(DN baseDN) public Map<Integer, CSN> getDomainNewestCSNs(DN baseDN) { final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); final Map<Integer, CSN> results = new HashMap<Integer, CSN>(domainMap.size()); for (DbHandler dbHandler : domainMap.values()) { results.put(dbHandler.getServerId(), dbHandler.getLastChange()); results.put(dbHandler.getServerId(), dbHandler.getNewestCSN()); } return results; } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -326,11 +326,11 @@ } /** * Read the first Change from the database. * Read the oldest CSN present in the database. * * @return the first CSN. * @return the oldest CSN in the DB, null if the DB is empty or closed */ public CSN readFirstChange() public CSN readOldestCSN() { dbCloseLock.readLock().lock(); @@ -383,11 +383,11 @@ /** * Read the last Change from the database. * Read the newest CSN present in the database. * * @return the last CSN. * @return the newest CSN in the DB, null if the DB is empty or closed */ public CSN readLastChange() public CSN readNewestCSN() { dbCloseLock.readLock().lock(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -121,8 +121,8 @@ assertNotFound(handler, csn5); // Test first and last assertEquals(csn1, handler.getFirstChange()); assertEquals(csn3, handler.getLastChange()); assertEquals(csn1, handler.getOldestCSN()); assertEquals(csn3, handler.getNewestCSN()); //-- // Cursor tests with db and memory queue populated @@ -144,9 +144,9 @@ int count = 300; // wait at most 60 seconds while (!purged && (count > 0)) { CSN firstChange = handler.getFirstChange(); CSN lastChange = handler.getLastChange(); if (!firstChange.equals(csn4) || !lastChange.equals(csn4)) CSN oldestCSN = handler.getOldestCSN(); CSN newestCSN = handler.getNewestCSN(); if (!oldestCSN.equals(csn4) || !newestCSN.equals(csn4)) { TestCaseUtils.sleep(100); } else @@ -266,15 +266,15 @@ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, csn3, "uid")); // Check they are here assertEquals(csn1, handler.getFirstChange()); assertEquals(csn3, handler.getLastChange()); assertEquals(csn1, handler.getOldestCSN()); assertEquals(csn3, handler.getNewestCSN()); // Clear ... handler.clear(); // Check the db is cleared. assertEquals(null, handler.getFirstChange()); assertEquals(null, handler.getLastChange()); assertEquals(null, handler.getOldestCSN()); assertEquals(null, handler.getNewestCSN()); } finally { @@ -365,10 +365,10 @@ handler.flush(); // Test first and last CSN csn1 = handler.getFirstChange(); assertEquals(csn1, csnArray[1], "First change"); CSN csnLast = handler.getLastChange(); assertEquals(csnLast, csnArray[max], "Last change"); CSN csn1 = handler.getOldestCSN(); assertEquals(csn1, csnArray[1], "Wrong oldest CSN"); CSN csnLast = handler.getNewestCSN(); assertEquals(csnLast, csnArray[max], "Wrong newest CSN"); // Test count in different subcases trying to handle all special cases // regarding the 'counter' record and 'count' algorithm @@ -448,10 +448,10 @@ handler.setCounterWindowSize(counterWindow); // Test first and last csn1 = handler.getFirstChange(); assertEquals(csn1, csnArray[1], "First change"); csnLast = handler.getLastChange(); assertEquals(csnLast, csnArray[max], "Last change"); csn1 = handler.getOldestCSN(); assertEquals(csn1, csnArray[1], "Wrong oldest CSN"); csnLast = handler.getNewestCSN(); assertEquals(csnLast, csnArray[max], "Wrong newest CSN"); testcase="FROM our first generated change TO now (> newest change in the db)"; actualCnt = handler.getCount(csnArray[1], newerThanLast); @@ -469,10 +469,10 @@ handler.flush(); // Test first and last csn1 = handler.getFirstChange(); assertEquals(csn1, csnArray[1], "First change"); csnLast = handler.getLastChange(); assertEquals(csnLast, csnArray[2 * max], "Last change"); csn1 = handler.getOldestCSN(); assertEquals(csn1, csnArray[1], "Wrong oldest CSN"); csnLast = handler.getNewestCSN(); assertEquals(csnLast, csnArray[2 * max], "Wrong newest CSN"); testcase="FROM our first generated change TO now (> newest change in the db)"; actualCnt = handler.getCount(csnArray[1], newerThanLast); @@ -487,16 +487,17 @@ debugInfo(tn,testcase + " After purge, total count=" + totalCount); testcase="AFTER PURGE (first, last)="; debugInfo(tn,testcase + handler.getFirstChange() + handler.getLastChange()); assertEquals(handler.getLastChange(), csnArray[2*max], "Last="); debugInfo(tn, testcase + handler.getOldestCSN() + handler.getNewestCSN()); assertEquals(handler.getNewestCSN(), csnArray[2*max], "Newest="); testcase="AFTER PURGE "; actualCnt = handler.getCount(csnArray[1], newerThanLast); int expectedCnt; if (totalCount>1) { expectedCnt = ((handler.getLastChange().getSeqnum() - handler.getFirstChange().getSeqnum() + 1)/2)+1; final int newestSeqnum = handler.getNewestCSN().getSeqnum(); final int oldestSeqnum = handler.getOldestCSN().getSeqnum(); expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1; } else { @@ -510,8 +511,8 @@ handler.clear(); // Check the db is cleared. assertEquals(null, handler.getFirstChange()); assertEquals(null, handler.getLastChange()); assertEquals(null, handler.getOldestCSN()); assertEquals(null, handler.getNewestCSN()); debugInfo(tn,"Success"); } finally