opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,9 +41,7 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.types.*; import org.opends.server.util.ServerConstants; @@ -66,8 +64,8 @@ */ private String operationId; /** Iterator on the changelogDB database. */ private ChangelogDBIterator changelogDBIter = null; /** Cursor on the {@link ChangeNumberIndexDB}. */ private ChangeNumberIndexDBCursor cnIndexDBCursor; private boolean draftCompat = false; /** @@ -564,13 +562,14 @@ private String findCookie(final int startDraftCN) throws ChangelogException, DirectoryException { final ChangelogDB changelogDB = replicationServer.getChangelogDB(); final ChangeNumberIndexDB cnIndexDB = replicationServer.getChangeNumberIndexDB(); if (startDraftCN <= 1) { // Request filter DOES NOT contain any firstDraftCN // So we'll generate from the first DraftCN in the DraftCNdb if (changelogDB.isEmpty()) if (cnIndexDB.isEmpty()) { // FIXME JNR if we find a way to make draftCNDb.isEmpty() a non costly // operation, then I think we can move this check to the top of this @@ -579,21 +578,21 @@ return null; } final int firstDraftCN = changelogDB.getFirstDraftCN(); final int firstDraftCN = cnIndexDB.getFirstDraftCN(); final String crossDomainStartState = changelogDB.getPreviousCookie(firstDraftCN); changelogDBIter = changelogDB.generateIterator(firstDraftCN); cnIndexDB.getPreviousCookie(firstDraftCN); cnIndexDBCursor = cnIndexDB.getCursorFrom(firstDraftCN); return crossDomainStartState; } // Request filter DOES contain a startDraftCN // Read the draftCNDb to see whether it contains startDraftCN String crossDomainStartState = changelogDB.getPreviousCookie(startDraftCN); String crossDomainStartState = cnIndexDB.getPreviousCookie(startDraftCN); if (crossDomainStartState != null) { // found the provided startDraftCN, let's return it changelogDBIter = changelogDB.generateIterator(startDraftCN); cnIndexDBCursor = cnIndexDB.getCursorFrom(startDraftCN); return crossDomainStartState; } @@ -612,10 +611,10 @@ // the DB, let's use the lower limit. if (startDraftCN < firstDraftCN) { crossDomainStartState = changelogDB.getPreviousCookie(firstDraftCN); crossDomainStartState = cnIndexDB.getPreviousCookie(firstDraftCN); if (crossDomainStartState != null) { changelogDBIter = changelogDB.generateIterator(firstDraftCN); cnIndexDBCursor = cnIndexDB.getCursorFrom(firstDraftCN); return crossDomainStartState; } @@ -627,15 +626,15 @@ { // startDraftCN is between first and potential last and has never // been returned yet if (changelogDB.isEmpty()) if (cnIndexDB.isEmpty()) { isEndOfDraftCNReached = true; return null; } final int lastKey = changelogDB.getLastDraftCN(); crossDomainStartState = changelogDB.getPreviousCookie(lastKey); changelogDBIter = changelogDB.generateIterator(lastKey); final int lastKey = cnIndexDB.getLastDraftCN(); crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey); cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); return crossDomainStartState; // TODO:ECL ... ok we'll start from the end of the draftCNDb BUT ... @@ -909,10 +908,10 @@ private void releaseIterator() { if (this.changelogDBIter != null) if (this.cnIndexDBCursor != null) { this.changelogDBIter.close(); this.changelogDBIter = null; this.cnIndexDBCursor.close(); this.cnIndexDBCursor = null; } } @@ -1371,8 +1370,8 @@ // the next change from the DraftCN db CSN csnFromDraftCNDb = changelogDBIter.getCSN(); String dnFromDraftCNDb = changelogDBIter.getBaseDN(); CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN(); String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN(); if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " @@ -1387,10 +1386,10 @@ { if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " assigning draftCN=" + changelogDBIter.getDraftCN() + " assigning draftCN=" + cnIndexDBCursor.getDraftCN() + " to change=" + oldestChange); oldestChange.setDraftChangeNumber(changelogDBIter.getDraftCN()); oldestChange.setDraftChangeNumber(cnIndexDBCursor.getDraftCN()); return true; } @@ -1419,12 +1418,12 @@ + " will skip " + csnFromDraftCNDb + " and read next change from the DraftCNDb."); isEndOfDraftCNReached = !changelogDBIter.next(); isEndOfDraftCNReached = !cnIndexDBCursor.next(); if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " has skipped to " + " sn=" + changelogDBIter.getDraftCN() + " csn=" + changelogDBIter.getCSN() + " has skipped to " + " sn=" + cnIndexDBCursor.getDraftCN() + " csn=" + cnIndexDBCursor.getCSN() + " End of draftCNDb ?" + isEndOfDraftCNReached); } catch (ChangelogException e) @@ -1454,8 +1453,7 @@ // store in changelogDB the pair // (DraftCN of the current change, state before this change) ChangelogDB changelogDB = replicationServer.getChangelogDB(); changelogDB.add( replicationServer.getChangeNumberIndexDB().add( change.getDraftChangeNumber(), previousCookie.toString(), change.getBaseDN(), opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,7 +52,7 @@ import org.opends.server.replication.common.*; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.je.DbHandler; import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; @@ -146,21 +146,21 @@ /** * The handler of the changelog database, the database stores the relation * between a draft change number ('seqnum') and the associated cookie. * between a change number and the associated cookie. * <p> * Guarded by changelogDBLock * Guarded by cnIndexDBLock */ private ChangelogDB changelogDB; private ChangeNumberIndexDB cnIndexDB; /** * The last value generated of the draft change number. * <p> * Guarded by changelogDBLock * Guarded by cnIndexDBLock **/ private int lastGeneratedDraftCN = 0; /** Used for protecting changelogDB related state. */ private final Object changelogDBLock = new Object(); /** Used for protecting {@link ChangeNumberIndexDB} related state. */ private final Object cnIndexDBLock = new Object(); /** * The tracer object for the debug logger. @@ -705,11 +705,11 @@ eclwe.finalizeWorkflowElement(); } synchronized (changelogDBLock) synchronized (cnIndexDBLock) { if (changelogDB != null) if (cnIndexDB != null) { changelogDB.shutdown(); cnIndexDB.shutdown(); } } } @@ -830,7 +830,7 @@ listenThread.interrupt(); } // shutdown all the ChangelogCaches // shutdown all the replication domains for (ReplicationServerDomain domain : getReplicationServerDomains()) { domain.shutdown(); @@ -890,13 +890,13 @@ } } synchronized (changelogDBLock) synchronized (cnIndexDBLock) { if (changelogDB != null) if (cnIndexDB != null) { try { changelogDB.clear(baseDn); cnIndexDB.clear(baseDn); } catch (Exception ignored) { @@ -908,7 +908,7 @@ try { lastGeneratedDraftCN = changelogDB.getLastDraftCN(); lastGeneratedDraftCN = cnIndexDB.getLastDraftCN(); } catch (Exception ignored) { @@ -1369,13 +1369,13 @@ rsd.clearDbs(); } synchronized (changelogDBLock) synchronized (cnIndexDBLock) { if (changelogDB != null) if (cnIndexDB != null) { try { changelogDB.clear(); cnIndexDB.clear(); } catch (Exception ignored) { @@ -1387,7 +1387,7 @@ try { changelogDB.shutdown(); cnIndexDB.shutdown(); } catch (Exception ignored) { @@ -1398,7 +1398,7 @@ } lastGeneratedDraftCN = 0; changelogDB = null; cnIndexDB = null; } } } @@ -1623,24 +1623,25 @@ } /** * Get (or create) a handler on the ChangelogDB for external changelog. * Get (or create) a handler on the {@link ChangeNumberIndexDB} for external * changelog. * * @return the handler. * @throws DirectoryException * when needed. */ public ChangelogDB getChangelogDB() throws DirectoryException public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException { synchronized (changelogDBLock) synchronized (cnIndexDBLock) { try { if (changelogDB == null) if (cnIndexDB == null) { changelogDB = new DraftCNDbHandler(this, this.dbEnv); cnIndexDB = new DraftCNDbHandler(this, this.dbEnv); lastGeneratedDraftCN = getLastDraftChangeNumber(); } return changelogDB; return cnIndexDB; } catch (Exception e) { @@ -1658,11 +1659,11 @@ */ public int getFirstDraftChangeNumber() { synchronized (changelogDBLock) synchronized (cnIndexDBLock) { if (changelogDB != null) if (cnIndexDB != null) { return changelogDB.getFirstDraftCN(); return cnIndexDB.getFirstDraftCN(); } return 0; } @@ -1674,11 +1675,11 @@ */ public int getLastDraftChangeNumber() { synchronized (changelogDBLock) synchronized (cnIndexDBLock) { if (changelogDB != null) if (cnIndexDB != null) { return changelogDB.getLastDraftCN(); return cnIndexDB.getLastDraftCN(); } return 0; } @@ -1690,7 +1691,7 @@ */ public int getNewDraftCN() { synchronized (changelogDBLock) synchronized (cnIndexDBLock) { return ++lastGeneratedDraftCN; } @@ -1730,10 +1731,9 @@ int lastDraftCN; boolean dbEmpty = false; long newestDate = 0; final ChangelogDB changelogDB = getChangelogDB(); final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); int firstDraftCN = changelogDB.getFirstDraftCN(); int firstDraftCN = cnIndexDB.getFirstDraftCN(); Map<String,ServerState> domainsServerStateForLastSeqnum = null; CSN csnForLastSeqnum = null; String domainForLastSeqnum = null; @@ -1745,11 +1745,11 @@ } else { lastDraftCN = changelogDB.getLastDraftCN(); lastDraftCN = cnIndexDB.getLastDraftCN(); // Get the generalized state associated with the current last DraftCN // and initializes from it the startStates table String lastSeqnumGenState = changelogDB.getPreviousCookie(lastDraftCN); String lastSeqnumGenState = cnIndexDB.getPreviousCookie(lastDraftCN); if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0)) { domainsServerStateForLastSeqnum = MultiDomainServerState. @@ -1757,13 +1757,13 @@ } // Get the CSN associated with the current last DraftCN csnForLastSeqnum = changelogDB.getCSN(lastDraftCN); csnForLastSeqnum = cnIndexDB.getCSN(lastDraftCN); // Get the domain associated with the current last DraftCN domainForLastSeqnum = changelogDB.getBaseDN(lastDraftCN); domainForLastSeqnum = cnIndexDB.getBaseDN(lastDraftCN); } // Domain by domain long newestDate = 0; for (ReplicationServerDomain rsd : getReplicationServerDomains()) { if (contains(excludedBaseDNs, rsd.getBaseDn())) @@ -1810,6 +1810,7 @@ if ((ec>0) && (firstDraftCN==0)) firstDraftCN = 1; } if (dbEmpty) { // The database was empty, just keep increasing numbers since last time opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java @@ -29,13 +29,18 @@ import org.opends.server.replication.common.CSN; /** * This class stores the changelog information into a database. * This class stores an index of all the changes seen by this server. The index * is sorted by a global ordering as defined in the CSN class. The index links a * <code>changeNumber</code> to the corresponding {@link CSN}. The {@link CSN} * then links to a corresponding change in one of the {@link ReplicaDB}s. * * @see <a href= * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" * >OpenDJ Domain Names</a> for more information about the changelog. * @see <a href= "http://tools.ietf.org/html/draft-good-ldap-changelog-04" * >OpenDJ Domain Names</a> for more information about the changeNumber. */ public interface ChangelogDB extends Runnable public interface ChangeNumberIndexDB extends Runnable { /** @@ -98,9 +103,9 @@ void add(int draftCN, String previousCookie, String baseDN, CSN csn); /** * Generate a new {@link ChangelogDBIterator} that allows to browse the db * managed by this dbHandler and starting at the position defined by a given * changeNumber. * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the * db managed by this dbHandler and starting at the position defined by a * given changeNumber. * * @param startDraftCN * The position where the iterator must start. @@ -110,7 +115,7 @@ * @throws ChangelogException * if a database problem happened. */ ChangelogDBIterator generateIterator(int startDraftCN) ChangeNumberIndexDBCursor getCursorFrom(int startDraftCN) throws ChangelogException; /** opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDBIterator.java @@ -35,7 +35,7 @@ * ChangelogDBIterator must be closed to release all the resources into the * database. */ public interface ChangelogDBIterator extends Closeable public interface ChangeNumberIndexDBCursor extends Closeable { /** opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -44,9 +44,7 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; @@ -62,13 +60,13 @@ * server in the topology. * It is responsible for efficiently saving the updates that is received from * each master server into stable storage. * This class is also able to generate a {@link ChangelogDBIterator} that can be * used to read all changes from a given draft ChangeNumber. * This class is also able to generate a {@link ChangeNumberIndexDBCursor} that * can be used to read all changes from a given draft ChangeNumber. * <p> * This class publishes some monitoring information below <code> * cn=monitor</code>. */ public class DraftCNDbHandler implements ChangelogDB public class DraftCNDbHandler implements ChangeNumberIndexDB { /** * The tracer object for the debug logger. @@ -217,7 +215,7 @@ /** {@inheritDoc} */ @Override public ChangelogDBIterator generateIterator(int startDraftCN) public ChangeNumberIndexDBCursor getCursorFrom(int startDraftCN) throws ChangelogException { return new DraftCNDbIterator(db, startDraftCN); opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
@@ -30,10 +30,8 @@ import org.opends.messages.Message; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.je.DraftCNDB .DraftCNDBCursor; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; import org.opends.server.types.DebugLogLevel; import static org.opends.server.loggers.debug.DebugLogger.*; @@ -42,7 +40,7 @@ * This class allows to iterate through the changes received from a given * LDAP Server Identifier. */ public class DraftCNDbIterator implements ChangelogDBIterator public class DraftCNDbIterator implements ChangeNumberIndexDBCursor { private static final DebugTracer TRACER = getTracer(); private DraftCNDBCursor draftCNDbCursor; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -1656,7 +1656,7 @@ LDAPFilter.decode(filterString), attributes); } /** * Test parallel simultaneous psearch with different filters. */ @@ -2678,22 +2678,23 @@ { String tn = "ECLPurgeDraftCNDbAfterChangelogClear"; debugInfo(tn, "Starting test\n\n"); DraftCNDbHandler draftdb = (DraftCNDbHandler) replicationServer.getChangeNumberIndexDB(); assertEquals(draftdb.count(), 8); draftdb.setPurgeDelay(1000); // Now clear the changelog db this.replicationServer.clearDb(); // Expect changes purged from the changelog db to be sometimes // also purged from the DraftCNDb. while (!draftdb.isEmpty()) { DraftCNDbHandler draftdb = (DraftCNDbHandler) replicationServer.getChangelogDB(); assertEquals(draftdb.count(), 8); draftdb.setPurgeDelay(1000); // Now Purge the changelog db this.replicationServer.clearDb(); // Expect changes purged from the changelog db to be sometimes // also purged from the DraftCNDb. while (!draftdb.isEmpty()) { debugInfo(tn, "draftdb.count="+draftdb.count()); sleep(200); } debugInfo(tn, "draftdb.count=" + draftdb.count()); sleep(200); } debugInfo(tn, "Ending test with success"); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -36,7 +36,7 @@ import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; import org.opends.server.util.StaticUtils; @@ -247,14 +247,14 @@ assertEquals(handler.getPreviousCookie(sn2),value2); assertEquals(handler.getPreviousCookie(sn3),value3); ChangelogDBIterator it = handler.generateIterator(sn1); assertIteratorReadsInOrder(it, sn1, sn2, sn3); ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(sn1); assertCursorReadsInOrder(cursor, sn1, sn2, sn3); it = handler.generateIterator(sn2); assertIteratorReadsInOrder(it, sn2, sn3); cursor = handler.getCursorFrom(sn2); assertCursorReadsInOrder(cursor, sn2, sn3); it = handler.generateIterator(sn3); assertIteratorReadsInOrder(it, sn3); cursor = handler.getCursorFrom(sn3); assertCursorReadsInOrder(cursor, sn3); handler.clear(); @@ -274,21 +274,21 @@ } } private void assertIteratorReadsInOrder(ChangelogDBIterator it, int... sns) throws ChangelogException private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor, int... sns) throws ChangelogException { try { for (int i = 0; i < sns.length; i++) { assertEquals(it.getDraftCN(), sns[i]); assertEquals(cursor.getDraftCN(), sns[i]); final boolean isNotLast = i + 1 < sns.length; assertEquals(it.next(), isNotLast); assertEquals(cursor.next(), isNotLast); } } finally { it.close(); cursor.close(); } } }