opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -40,7 +40,10 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.api.CNIndexRecord; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.*; import org.opends.server.util.ServerConstants; @@ -59,7 +62,11 @@ { private static int UNDEFINED_PHASE = 0; /** TODO JNR. */ /** * Constant used to indicate the handler is in the ECL initialization phase. * * @see #getSearchPhase() */ public static int INIT_PHASE = 1; private static int PERSISTENT_PHASE = 2; @@ -70,7 +77,7 @@ private String operationId; /** Cursor on the {@link ChangeNumberIndexDB}. */ private ChangeNumberIndexDBCursor cnIndexDBCursor; private DBCursor<CNIndexRecord> cnIndexDBCursor; private boolean draftCompat = false; /** opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -41,7 +41,7 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; @@ -296,14 +296,14 @@ * restart as usual * load this change on the delayList */ ReplicaDBCursor cursor = null; DBCursor<UpdateMsg> cursor = null; try { // fill the lateQueue cursor = replicationServerDomain.getCursorFrom(serverState); while (cursor.next() && isLateQueueBelowThreshold()) { lateQueue.add(cursor.getChange()); lateQueue.add(cursor.getRecord()); } } catch (ChangelogException e) @@ -454,12 +454,12 @@ private CSN findOldestCSNFromReplicaDBs() { ReplicaDBCursor cursor = null; DBCursor<UpdateMsg> cursor = null; try { cursor = replicationServerDomain.getCursorFrom(serverState); cursor.next(); return cursor.getChange().getCSN(); return cursor.getRecord().getCSN(); } catch (Exception e) { opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -51,7 +51,7 @@ import org.opends.server.replication.plugin.ReplicationServerListener; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.*; import org.opends.server.util.*; @@ -630,14 +630,14 @@ return; } ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN); DBCursor<UpdateMsg> cursor = rsDomain.getCursorFrom(previousCSN); try { int lookthroughCount = 0; // Walk through the changes cursor.next(); // first try to advance the cursor while (cursor.getChange() != null) while (cursor.getRecord() != null) { if (exportConfig != null && exportConfig.isCancelled()) { // abort if cancelled @@ -648,7 +648,7 @@ break; } lookthroughCount++; writeChange(cursor.getChange(), ldifWriter, searchOperation, writeChange(cursor.getRecord(), ldifWriter, searchOperation, rsDomain.getBaseDN(), exportConfig != null); cursor.next(); } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,7 @@ import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.*; @@ -1271,19 +1271,19 @@ /** * Creates and returns a cursor across this replication domain. * <p> * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor * to the next available record. * Client code must call {@link DBCursor#next()} to advance the cursor to the * next available record. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * {@link DBCursor#close()} method to free the resources and locks used by the * cursor. * * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * @return a non null {@link ReplicaDBCursor} * @return a non null {@link DBCursor} * @see ReplicationDomainDB#getCursorFrom(DN, CSN) */ public ReplicaDBCursor getCursorFrom(CSN startAfterCSN) public DBCursor<UpdateMsg> getCursorFrom(CSN startAfterCSN) { return domainDB.getCursorFrom(baseDN, startAfterCSN); } @@ -1291,20 +1291,20 @@ /** * Creates and returns a cursor across this replication domain. * <p> * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor * to the next available record. * Client code must call {@link DBCursor#next()} to advance the cursor to the * next available record. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * {@link DBCursor#close()} method to free the resources and locks used by the * cursor. * * @param startAfterServerState * Starting point for the replicaDB cursors. If null, start from the * oldest CSN * @return a non null {@link ReplicaDBCursor} going from oldest to newest CSN * @return a non null {@link DBCursor} going from oldest to newest CSN * @see ReplicationDomainDB#getCursorFrom(DN, ServerState) */ public ReplicaDBCursor getCursorFrom(ServerState startAfterServerState) public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) { return domainDB.getCursorFrom(baseDN, startAfterServerState); } opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -99,9 +99,8 @@ long addRecord(CNIndexRecord record) throws ChangelogException; /** * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the * db managed by this object and starting at the position defined by a given * changeNumber. * Generate a new {@link DBCursor} that allows to browse the db managed by * this object and starting at the position defined by a given changeNumber. * * @param startChangeNumber * The position where the iterator must start. @@ -111,7 +110,7 @@ * @throws ChangelogException * if a database problem occurs. */ ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber) DBCursor<CNIndexRecord> getCursorFrom(long startChangeNumber) throws ChangelogException; } opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java @@ -29,19 +29,22 @@ import java.io.Closeable; /** * Iterator into the changelog database. Once it is not used anymore, a * ChangelogDBIterator must be closed to release all the resources into the * Generic cursor interface into the changelog database. Once it is not used * anymore, a cursor must be closed to release all the resources into the * database. * * @param <T> * type of the record being returned */ public interface ChangeNumberIndexDBCursor extends Closeable public interface DBCursor<T> extends Closeable { /** * Getter for the record. * Getter for the current record. * * @return The current {@link CNIndexRecord}. * @return The current record. */ CNIndexRecord getRecord(); T getRecord(); /** * Skip to the next record of the database. opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
File was deleted opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -162,36 +162,35 @@ long getCount(DN baseDN, int serverId, CSN from, CSN to); /** * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the * specified replication domain, with all cursors starting after the provided * CSN. * Generates a {@link DBCursor} across all the replicaDBs for the specified * replication domain, with all cursors starting after the provided CSN. * <p> * The cursor is already advanced to the record after startAfterCSN. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * {@link DBCursor#close()} method to free the resources and locks used by the * cursor. * * @param baseDN * the replication domain baseDN * @param startAfterCSN * Starting point for each ReplicaDB cursor. If null, start from the * oldest CSN for each ReplicaDB cursor. * @return a non null {@link ReplicaDBCursor} * @return a non null {@link DBCursor} * @see #getCursorFrom(DN, ServerState) */ ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN); DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN); /** * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the * specified replication domain starting after the provided * {@link ServerState} for each replicaDBs. * Generates a {@link DBCursor} across all the replicaDBs for the specified * replication domain starting after the provided {@link ServerState} for each * replicaDBs. * <p> * The cursor is already advanced to the records after the serverState. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * {@link DBCursor#close()} method to free the resources and locks used by the * cursor. * * @param baseDN * the replication domain baseDN @@ -199,10 +198,11 @@ * Starting point for each ReplicaDB cursor. If any CSN for a * replicaDB is null, then start from the oldest CSN for this * replicaDB * @return a non null {@link ReplicaDBCursor} * @return a non null {@link DBCursor} * @see #getCursorFrom(DN, CSN) */ ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState); DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterServerState); /** * for the specified serverId and replication domain. opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -59,8 +59,8 @@ * This class is used for managing the replicationServer database for each * server in the topology. It is responsible for efficiently saving the updates * that is received from each master server into stable storage. This class is * also able to generate a {@link ChangeNumberIndexDBCursor} that can be used to * read all changes from a given change number. * also able to generate a {@link DBCursor} that can be used to read all changes * from a given change number. * <p> * This class publishes some monitoring information below <code> * cn=monitor</code>. @@ -240,7 +240,7 @@ /** {@inheritDoc} */ @Override public ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber) public DBCursor<CNIndexRecord> getCursorFrom(long startChangeNumber) throws ChangelogException { return new JEChangeNumberIndexDBCursor(db, startChangeNumber); opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
@@ -39,7 +39,7 @@ * This class allows to iterate through the changes received from a given * LDAP Server Identifier. */ public class JEChangeNumberIndexDBCursor implements ChangeNumberIndexDBCursor public class JEChangeNumberIndexDBCursor implements DBCursor<CNIndexRecord> { private static final DebugTracer TRACER = getTracer(); private DraftCNDBCursor draftCNDbCursor; @@ -116,4 +116,5 @@ { close(); } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -54,21 +54,31 @@ { /** * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a * replication domain, advancing from the oldest to the newest change cross * {@link DBCursor} implementation that iterates across all the ReplicaDBs of * a replication domain, advancing from the oldest to the newest change cross * all replicaDBs. */ private final class CrossReplicaDBCursor implements ReplicaDBCursor private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg> { private final DN baseDN; private UpdateMsg currentChange; /** * The cursors are sorted based on the current change of each cursor to * consider the next change across all replicaDBs. */ private final NavigableSet<ReplicaDBCursor> cursors = new TreeSet<ReplicaDBCursor>(); private final DN baseDN; private final NavigableSet<DBCursor<UpdateMsg>> cursors = new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() { @Override public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) { final CSN csn1 = o1.getRecord().getCSN(); final CSN csn2 = o2.getRecord().getCSN(); return CSN.compare(csn1, csn2); } }); public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) { @@ -81,7 +91,7 @@ } } private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); @@ -89,7 +99,8 @@ { try { ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; } @@ -112,16 +123,16 @@ // To keep consistent the cursors' order in the SortedSet, it is necessary // to remove and eventually add again a cursor (after moving it forward). final ReplicaDBCursor cursor = cursors.pollFirst(); currentChange = cursor.getChange(); final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); currentChange = cursor.getRecord(); cursor.next(); addCursorIfNotEmpty(cursor); return true; } void addCursorIfNotEmpty(ReplicaDBCursor cursor) void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor) { if (cursor.getChange() != null) if (cursor.getRecord() != null) { cursors.add(cursor); } @@ -132,7 +143,7 @@ } @Override public UpdateMsg getChange() public UpdateMsg getRecord() { return currentChange; } @@ -143,15 +154,6 @@ StaticUtils.close(cursors); } @Override public int compareTo(ReplicaDBCursor o) { final CSN csn1 = getChange().getCSN(); final CSN csn2 = o.getChange().getCSN(); return CSN.compare(csn1, csn2); } /** {@inheritDoc} */ @Override public String toString() @@ -184,27 +186,18 @@ /** The local replication server. */ private final ReplicationServer replicationServer; private static final ReplicaDBCursor EMPTY_CURSOR = new ReplicaDBCursor() private static final DBCursor<UpdateMsg> EMPTY_CURSOR = new DBCursor<UpdateMsg>() { @Override public int compareTo(ReplicaDBCursor o) { if (o == null) { throw new NullPointerException(); // as per javadoc } return o == this ? 0 : -1; // equal to self, but less than all the rest } @Override public boolean next() { return false; } @Override public UpdateMsg getChange() public UpdateMsg getRecord() { return null; } @@ -218,7 +211,7 @@ @Override public String toString() { return "EmptyReplicaDBCursor"; return "EmptyDBCursor<UpdateMsg>"; } }; @@ -670,7 +663,7 @@ /** {@inheritDoc} */ @Override public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN) public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN) { // Builds a new serverState for all the serverIds in the replication domain // to ensure we get cursors starting after the provided CSN. @@ -679,7 +672,7 @@ /** {@inheritDoc} */ @Override public ReplicaDBCursor getCursorFrom(DN baseDN, public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterServerState) { return new CrossReplicaDBCursor(baseDN, startAfterServerState); opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -43,7 +43,7 @@ import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; @@ -58,11 +58,13 @@ /** * This class is used for managing the replicationServer database for each * server in the topology. * <p> * It is responsible for efficiently saving the updates that is received from * each master server into stable storage. * This class is also able to generate a {@link ReplicaDBCursor} that can be * used to read all changes from a given {@link CSN}. * * <p> * This class is also able to generate a {@link DBCursor} that can be used to * read all changes from a given {@link CSN}. * <p> * This class publish some monitoring information below cn=monitor. */ public class JEReplicaDB implements Runnable @@ -259,19 +261,18 @@ } /** * Generate a new {@link ReplicaDBCursor} that allows to browse the db managed * by this ReplicaDB and starting at the position defined by a given CSN. * Generate a new {@link DBCursor} that allows to browse the db managed by * this ReplicaDB and starting at the position defined by a given CSN. * * @param startAfterCSN * The position where the cursor must start. If null, start from the * oldest CSN * @return a new {@link ReplicaDBCursor} that allows to browse the db managed * by this ReplicaDB and starting at the position defined by a given * CSN. * @return a new {@link DBCursor} that allows to browse the db managed by this * ReplicaDB and starting at the position defined by a given CSN. * @throws ChangelogException * if a database problem happened. */ public ReplicaDBCursor generateCursorFrom(CSN startAfterCSN) public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException { if (startAfterCSN == null) opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -30,13 +30,13 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; /** * Berkeley DB JE implementation of {@link ReplicaDBCursor}. * Berkeley DB JE implementation of {@link DBCursor}. */ public class JEReplicaDBCursor implements ReplicaDBCursor public class JEReplicaDBCursor implements DBCursor<UpdateMsg> { private UpdateMsg currentChange; private ReplServerDBCursor cursor; @@ -87,7 +87,7 @@ /** {@inheritDoc} */ @Override public UpdateMsg getChange() public UpdateMsg getRecord() { return currentChange; } @@ -152,16 +152,6 @@ /** {@inheritDoc} */ @Override public int compareTo(ReplicaDBCursor o) { final CSN csn1 = getChange().getCSN(); final CSN csn2 = o.getChange().getCSN(); return CSN.compare(csn1, csn2); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " currentChange=" + currentChange + "" opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -36,8 +36,8 @@ import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.CNIndexRecord; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; @@ -210,7 +210,7 @@ assertEquals(getPreviousCookie(cnIndexDB, cn2), value2); assertEquals(getPreviousCookie(cnIndexDB, cn3), value3); ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(cn1); DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1); assertCursorReadsInOrder(cursor, cn1, cn2, cn3); cursor = cnIndexDB.getCursorFrom(cn2); @@ -244,7 +244,7 @@ private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB, long changeNumber) throws Exception { ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(changeNumber); DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber); try { return cursor.getRecord().getPreviousCookie(); @@ -255,7 +255,7 @@ } } private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor, private void assertCursorReadsInOrder(DBCursor<CNIndexRecord> cursor, long... sns) throws ChangelogException { try opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -38,10 +38,11 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; import org.testng.annotations.BeforeClass; @@ -201,17 +202,17 @@ return; } ReplicaDBCursor cursor = replicaDB.generateCursorFrom(csns[0]); DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]); try { assertNull(cursor.getChange()); assertNull(cursor.getRecord()); for (int i = 1; i < csns.length; i++) { assertTrue(cursor.next()); assertEquals(cursor.getChange().getCSN(), csns[i]); assertEquals(cursor.getRecord().getCSN(), csns[i]); } assertFalse(cursor.next()); assertNull(cursor.getChange(), "Actual change=" + cursor.getChange() assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord() + ", Expected null"); } finally @@ -222,7 +223,7 @@ private void assertNotFound(JEReplicaDB replicaDB, CSN csn) { ReplicaDBCursor cursor = null; DBCursor<UpdateMsg> cursor = null; try { cursor = replicaDB.generateCursorFrom(csn); @@ -282,7 +283,7 @@ public void testGenerateCursorFrom() throws Exception { ReplicationServer replicationServer = null; ReplicaDBCursor cursor = null; DBCursor<UpdateMsg> cursor = null; try { TestCaseUtils.startServer(); @@ -301,17 +302,17 @@ cursor = replicaDB.generateCursorFrom(csns[0]); assertTrue(cursor.next()); assertEquals(cursor.getChange().getCSN(), csns[1]); assertEquals(cursor.getRecord().getCSN(), csns[1]); StaticUtils.close(cursor); cursor = replicaDB.generateCursorFrom(csns[3]); assertTrue(cursor.next()); assertEquals(cursor.getChange().getCSN(), csns[4]); assertEquals(cursor.getRecord().getCSN(), csns[4]); StaticUtils.close(cursor); cursor = replicaDB.generateCursorFrom(csns[4]); assertFalse(cursor.next()); assertNull(cursor.getChange()); assertNull(cursor.getRecord()); } finally {