opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -598,22 +598,23 @@ return null; } final String crossDomainStartState = oldestRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestRecord.getChangeNumber()); return crossDomainStartState; cnIndexDBCursor = getCursorFrom(cnIndexDB, oldestRecord.getChangeNumber()); return oldestRecord.getPreviousCookie(); } // Request filter DOES contain a startChangeNumber // Read the CNIndexDB to see whether it contains startChangeNumber CNIndexRecord startRecord = cnIndexDB.getRecord(startChangeNumber); DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(startChangeNumber); final CNIndexRecord startRecord = cursor.getRecord(); if (startRecord != null) { // found the provided startChangeNumber, let's return it final String crossDomainStartState = startRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber); return crossDomainStartState; cnIndexDBCursor = cursor; return startRecord.getPreviousCookie(); } close(cursor); // startChangeNumber provided in the request IS NOT in the CNIndexDB @@ -630,17 +631,18 @@ // the DB, let's use the lower limit. if (startChangeNumber < oldestChangeNumber) { CNIndexRecord oldestRecord = cnIndexDB.getRecord(oldestChangeNumber); cursor = cnIndexDB.getCursorFrom(oldestChangeNumber); final CNIndexRecord oldestRecord = cursor.getRecord(); if (oldestRecord == null) { // This should not happen close(cursor); isEndOfCNIndexDBReached = true; return null; } final String crossDomainStartState = oldestRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestChangeNumber); return crossDomainStartState; cnIndexDBCursor = cursor; return oldestRecord.getPreviousCookie(); } else if (startChangeNumber <= newestChangeNumber) { @@ -653,9 +655,9 @@ return null; } final String crossDomainStartState = newestRecord.getPreviousCookie(); cnIndexDBCursor = cnIndexDB.getCursorFrom(newestRecord.getChangeNumber()); return crossDomainStartState; cnIndexDBCursor = getCursorFrom(cnIndexDB, newestRecord.getChangeNumber()); return newestRecord.getPreviousCookie(); // TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ... // this may be very long. Work on perf improvement here. @@ -665,6 +667,19 @@ throw new DirectoryException(ResultCode.SUCCESS, Message.raw("")); } private DBCursor<CNIndexRecord> getCursorFrom(ChangeNumberIndexDB cnIndexDB, long startChangeNumber) throws ChangelogException { DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(startChangeNumber); if (cursor.getRecord() == null) { close(cursor); throw new ChangelogException(Message.raw("Change Number " + startChangeNumber + " is not available in the Changelog")); } return cursor; } /** * Initialize the context for each domain. * @param providedCookie the provided generalized state opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -51,17 +51,6 @@ long getLastGeneratedChangeNumber(); /** * Get the record associated to a provided change number. * * @param changeNumber * the provided change number. * @return the {@link CNIndexRecord}, null when none. * @throws ChangelogException * if a database problem occurs. */ CNIndexRecord getRecord(long changeNumber) throws ChangelogException; /** * Get the oldest record stored in this DB. * * @return Returns the oldest {@link CNIndexRecord} in this DB, null when the opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -32,7 +32,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.opends.messages.MessageBuilder; import org.opends.server.admin.std.server.MonitorProviderCfg; @@ -220,24 +219,6 @@ return getNewestRecord() == null; } /** * Get a read cursor on the database from a provided key. The cursor MUST be * closed after use. * <p> * This method is only used by unit tests. * * @param startChangeNumber * The change number from where to start. * @return the new cursor. * @throws ChangelogException * if a database problem occurs. */ DraftCNDBCursor getReadCursor(long startChangeNumber) throws ChangelogException { return db.openReadCursor(startChangeNumber); } /** {@inheritDoc} */ @Override public DBCursor<CNIndexRecord> getCursorFrom(long startChangeNumber) @@ -564,49 +545,4 @@ newestChangeNumber = getChangeNumber(db.readLastRecord()); } private ReentrantLock lock = new ReentrantLock(); /** * Tests if the current thread has the lock on this object. * @return True if the current thread has the lock. */ public boolean hasLock() { return lock.getHoldCount() > 0; } /** * Takes the lock on this object (blocking until lock can be acquired). * @throws InterruptedException If interrupted. */ public void lock() throws InterruptedException { lock.lockInterruptibly(); } /** * Releases the lock on this object. */ public void release() { lock.unlock(); } /** {@inheritDoc} */ @Override public CNIndexRecord getRecord(long changeNumber) throws ChangelogException { DraftCNDBCursor cursor = null; try { cursor = db.openReadCursor(changeNumber); return cursor.currentRecord(); } finally { close(cursor); } } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
@@ -27,7 +27,6 @@ */ package org.opends.server.replication.server.changelog.je; import org.opends.messages.Message; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; @@ -59,11 +58,6 @@ throws ChangelogException { draftCNDbCursor = db.openReadCursor(startChangeNumber); if (draftCNDbCursor.currentRecord() == null) { throw new ChangelogException(Message.raw("Change Number " + startChangeNumber + " is not available in the Changelog")); } } /** {@inheritDoc} */ opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -38,7 +38,6 @@ import org.opends.server.replication.server.changelog.api.CNIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; import org.testng.annotations.Test; @@ -96,23 +95,19 @@ assertEquals(oldestCN, cn1); assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3); DraftCNDBCursor dbc = cnIndexDB.getReadCursor(oldestCN); DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN); try { assertEqualTo(dbc.currentRecord(), csns[0], baseDN1, value1); assertTrue(dbc.toString().length() != 0); assertTrue(dbc.next()); assertEqualTo(dbc.currentRecord(), csns[1], baseDN2, value2); assertTrue(dbc.next()); assertEqualTo(dbc.currentRecord(), csns[2], baseDN3, value3); assertFalse(dbc.next()); assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1); assertTrue(cursor.next()); assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2); assertTrue(cursor.next()); assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3); assertFalse(cursor.next()); } finally { StaticUtils.close(dbc); StaticUtils.close(cursor); } // Now test that the trimming thread does its job => start it @@ -136,11 +131,11 @@ } } private void assertEqualTo(CNIndexRecord data, CSN csn, DN baseDN, String cookie) private void assertEqualTo(CNIndexRecord record, CSN csn, DN baseDN, String cookie) { assertEquals(data.getCSN(), csn); assertEquals(data.getBaseDN(), baseDN); assertEquals(data.getPreviousCookie(), cookie); assertEquals(record.getCSN(), csn); assertEquals(record.getBaseDN(), baseDN); assertEquals(record.getPreviousCookie(), cookie); } private JEChangeNumberIndexDB newCNIndexDB(ReplicationServer rs) throws Exception @@ -256,14 +251,14 @@ } private void assertCursorReadsInOrder(DBCursor<CNIndexRecord> cursor, long... sns) throws ChangelogException long... cns) throws ChangelogException { try { for (int i = 0; i < sns.length; i++) for (int i = 0; i < cns.length; i++) { assertEquals(cursor.getRecord().getChangeNumber(), sns[i]); final boolean isNotLast = i + 1 < sns.length; assertEquals(cursor.getRecord().getChangeNumber(), cns[i]); final boolean isNotLast = i + 1 < cns.length; assertEquals(cursor.next(), isNotLast); } }