opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -37,7 +37,6 @@ import org.forgerock.i18n.LocalizableMessageBuilder; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; import org.forgerock.util.Reject; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.replication.common.CSN; @@ -578,7 +577,10 @@ { firstException = e; } else logger.traceException(e); else { logger.traceException(e); } } } } @@ -757,12 +759,10 @@ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, final PositionStrategy positionStrategy) throws ChangelogException { Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY" + " is not supported for the JE implementation fo changelog"); final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN); final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); @@ -853,12 +853,12 @@ { indexer.replicaOffline(baseDN, offlineCSN); } updateCursorsWithOfflineCSN(baseDN, offlineCSN); updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); } private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN) private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN) { final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN)); final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); if (cursors != null && !cursors.isEmpty()) { for (ReplicaCursor cursor : cursors) opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,6 +41,7 @@ import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; @@ -177,18 +178,20 @@ * Generate a new {@link DBCursor} that allows to browse the db managed by * this ReplicaDB and starting at the position defined by a given CSN. * * @param startAfterCSN * @param startCSN * The position where the cursor must start. If null, start from the * oldest CSN * @param positionStrategy * indicates at which exact position the cursor must start * @return a new {@link DBCursor} that allows to browse the db managed by this * ReplicaDB and starting at the position defined by a given CSN. * @throws ChangelogException * if a database problem happened */ public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException { return new JEReplicaDBCursor(db, startAfterCSN, this); return new JEReplicaDBCursor(db, startCSN, positionStrategy, this); } /** opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,18 +32,21 @@ import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; /** * Berkeley DB JE implementation of {@link DBCursor}. * * \@NotThreadSafe */ public class JEReplicaDBCursor implements DBCursor<UpdateMsg> class JEReplicaDBCursor implements DBCursor<UpdateMsg> { private UpdateMsg currentChange; private ReplServerDBCursor cursor; private final ReplicationDB db; private final PositionStrategy positionStrategy; private JEReplicaDB replicaDB; private ReplicationDB db; private CSN lastNonNullCurrentCSN; private ReplServerDBCursor cursor; private UpdateMsg currentChange; /** * Creates a new {@link JEReplicaDBCursor}. All created cursor must be @@ -51,20 +54,23 @@ * * @param db * The db where the cursor must be created. * @param startAfterCSN * @param startCSN * The CSN after which the cursor must start.If null, start from the * oldest CSN * @param positionStrategy * indicates at which exact position the cursor must start * @param replicaDB * The associated JEReplicaDB. * @throws ChangelogException * if a database problem happened. * if a database problem happened. */ public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN, public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy, JEReplicaDB replicaDB) throws ChangelogException { this.db = db; this.positionStrategy = positionStrategy; this.replicaDB = replicaDB; this.lastNonNullCurrentCSN = startAfterCSN; this.lastNonNullCurrentCSN = startCSN; } /** {@inheritDoc} */ @@ -78,9 +84,6 @@ @Override public boolean next() throws ChangelogException { final ReplServerDBCursor localCursor = cursor; currentChange = localCursor != null ? localCursor.next() : null; if (currentChange == null) { synchronized (this) @@ -91,10 +94,18 @@ // if following code is called while the cursor is closed. // It is better to let the deadlock happen to help quickly identifying // and fixing such issue with unit tests. cursor = db.openReadCursor(lastNonNullCurrentCSN); currentChange = cursor.next(); cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy); } } // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized. if (positionStrategy == ON_MATCHING_KEY && currentChange != null || positionStrategy == AFTER_MATCHING_KEY) { cursor.next(); } currentChange = cursor.getRecord(); if (currentChange != null) { lastNonNullCurrentCSN = currentChange.getCSN(); @@ -110,7 +121,7 @@ synchronized (this) { closeCursor(); this.replicaDB = null; replicaDB = null; } } @@ -120,11 +131,12 @@ { cursor.close(); cursor = null; currentChange = null; } } /** * Called by the Gc when the object is garbage collected. Release the internal * Called by the GC when the object is garbage collected. Release the internal * cursor in case the cursor was badly used and {@link #close()} was never * called. */ @@ -138,7 +150,9 @@ @Override public String toString() { return getClass().getSimpleName() + " currentChange=" + currentChange return getClass().getSimpleName() + " positionStrategy=" + positionStrategy + " currentChange=" + currentChange + " replicaDB=" + replicaDB; } } opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,7 +26,6 @@ */ package org.opends.server.replication.server.changelog.je; import java.io.Closeable; import org.forgerock.i18n.slf4j.LocalizedLogger; import java.io.UnsupportedEncodingException; import java.util.concurrent.locks.ReadWriteLock; @@ -39,6 +38,8 @@ import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; @@ -244,15 +245,18 @@ private DatabaseEntry createReplicationKey(CSN csn) { DatabaseEntry key = new DatabaseEntry(); try final DatabaseEntry key = new DatabaseEntry(); if (csn != null) { key.setData(csn.toString().getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happens, UTF-8 is always supported // TODO : add better logging try { key.setData(csn.toString().getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happens, UTF-8 is always supported // TODO : add better logging } } return key; } @@ -285,13 +289,15 @@ * @param startCSN * The CSN from which the cursor must start.If null, start from the * oldest CSN * @param positionStrategy * indicates at which exact position the cursor must start * @return The ReplServerDBCursor. * @throws ChangelogException * If a database problem happened */ ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException { return new ReplServerDBCursor(startCSN); return new ReplServerDBCursor(startCSN, positionStrategy); } /** @@ -445,7 +451,7 @@ * This Class implements a cursor that can be used to browse a * replicationServer database. */ class ReplServerDBCursor implements Closeable class ReplServerDBCursor implements DBCursor<UpdateMsg> { /** * The transaction that will protect the actions done with the cursor. @@ -454,12 +460,14 @@ * <p> * Will be set non null for a write cursor */ private final Transaction txn; private final Cursor cursor; private final DatabaseEntry key; private final DatabaseEntry data; /** \@Null for read cursors, \@NotNull for deleting cursors. */ private final Transaction txn; private UpdateMsg currentRecord; private boolean isClosed = false; private boolean isClosed; /** * Creates a ReplServerDBCursor that can be used for browsing a @@ -467,21 +475,15 @@ * * @param startCSN * The CSN from which the cursor must start. * @param positionStrategy * indicates at which exact position the cursor must start * @throws ChangelogException * When the startCSN does not exist. */ private ReplServerDBCursor(CSN startCSN) throws ChangelogException private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException { if (startCSN != null) { key = createReplicationKey(startCSN); } else { key = new DatabaseEntry(); } key = createReplicationKey(startCSN); data = new DatabaseEntry(); txn = null; // Take the lock. From now on, whatever error that happen in the life @@ -515,18 +517,25 @@ return; } // We can move close to the startCSN. // Let's create a cursor from that point. DatabaseEntry aKey = new DatabaseEntry(); DatabaseEntry aData = new DatabaseEntry(); if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS) if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY) { localCursor.close(); localCursor = db.openCursor(txn, null); // We can move close to the startCSN. // Let's create a cursor from that point. key.setData(null); if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) { localCursor.close(); localCursor = db.openCursor(txn, null); } } } cursor = localCursor; cursorHeld = cursor != null; if (key.getData() != null) { computeCurrentRecord(); } } catch (DatabaseException e) { @@ -604,6 +613,7 @@ return; } isClosed = true; currentRecord = null; } closeAndReleaseReadLock(cursor); @@ -658,6 +668,7 @@ return null; } currentRecord = null; try { if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS) @@ -672,60 +683,73 @@ } } /** * Get the next UpdateMsg from this cursor. * * @return the next UpdateMsg. */ UpdateMsg next() /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { if (isClosed) { return null; return false; } UpdateMsg currentChange = null; while (currentChange == null) currentRecord = null; while (currentRecord == null) { try { if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS) { return null; return false; } } catch (DatabaseException e) { return null; throw new ChangelogException(e); } CSN csn = null; try { csn = toCSN(key.getData()); if (isACounterRecord(csn)) { continue; } currentChange = (UpdateMsg) ReplicationMsg.generateMsg( data.getData(), ProtocolVersion.getCurrentVersion()); } catch (Exception e) { /* * An error happening trying to convert the data from the * replicationServer database to an Update LocalizableMessage. This can only * happen if the database is corrupted. There is not much more that we * can do at this point except trying to continue with the next * record. In such case, it is therefore possible that we miss some * changes. * TODO : This should be handled by the repair functionality. */ logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(), csn, e.getMessage()); } computeCurrentRecord(); } return currentChange; return currentRecord != null; } private void computeCurrentRecord() { CSN csn = null; try { csn = toCSN(key.getData()); if (isACounterRecord(csn)) { return; } currentRecord = toRecord(data.getData()); } catch (Exception e) { /* * An error happening trying to convert the data from the * replicationServer database to an Update Message. This can only * happen if the database is corrupted. There is not much more that we * can do at this point except trying to continue with the next * record. In such case, it is therefore possible that we miss some * changes. * TODO : This should be handled by the repair functionality. */ logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(), csn, e.getMessage()); } } private UpdateMsg toRecord(final byte[] data) throws Exception { final short currentVersion = ProtocolVersion.getCurrentVersion(); return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion); } /** {@inheritDoc} */ @Override public UpdateMsg getRecord() { return currentRecord; } /** opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -28,7 +28,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import org.assertj.core.api.SoftAssertions; import org.opends.server.TestCaseUtils; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.forgerock.opendj.config.server.ConfigException; @@ -40,13 +43,16 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; /** @@ -81,11 +87,12 @@ void testTrim() throws Exception { ReplicationServer replicationServer = null; JEReplicaDB replicaDB = null; try { TestCaseUtils.startServer(); replicationServer = configureReplicationServer(100, 5000); final JEReplicaDB replicaDB = newReplicaDB(replicationServer); replicaDB = newReplicaDB(replicationServer); CSN[] csns = newCSNs(1, 0, 5); @@ -97,7 +104,7 @@ //-- // Iterator tests with changes persisted assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); assertNotFound(replicaDB, csns[4]); assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); assertEquals(replicaDB.getOldestCSN(), csns[0]); assertEquals(replicaDB.getNewestCSN(), csns[2]); @@ -110,7 +117,7 @@ // Test cursor from existing CSN assertFoundInOrder(replicaDB, csns[2], csns[3]); assertFoundInOrder(replicaDB, csns[3]); assertNotFound(replicaDB, csns[4]); assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); @@ -133,6 +140,7 @@ } finally { shutdown(replicaDB); remove(replicationServer); } } @@ -182,38 +190,34 @@ return; } DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]); try { assertNull(cursor.getRecord()); for (int i = 1; i < csns.length; i++) { final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); assertTrue(cursor.next(), msg); assertEquals(cursor.getRecord().getCSN(), csns[i], msg); } assertFalse(cursor.next()); assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord() + ", Expected null"); } finally { StaticUtils.close(cursor); } assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns); assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns); } private void assertNotFound(JEReplicaDB replicaDB, CSN csn) throws Exception private void assertFoundInOrder(JEReplicaDB replicaDB, final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException { DBCursor<UpdateMsg> cursor = null; DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy); try { cursor = replicaDB.generateCursorFrom(csn); assertFalse(cursor.next()); assertNull(cursor.getRecord()); assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++) { final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); final SoftAssertions softly = new SoftAssertions(); softly.assertThat(cursor.next()).as(msg).isTrue(); softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]); softly.assertAll(); } final SoftAssertions softly = new SoftAssertions(); softly.assertThat(cursor.next()).isFalse(); softly.assertThat(cursor.getRecord()).isNull(); softly.assertAll(); } finally { StaticUtils.close(cursor); close(cursor); } } @@ -226,11 +230,12 @@ void testClear() throws Exception { ReplicationServer replicationServer = null; JEReplicaDB replicaDB = null; try { TestCaseUtils.startServer(); replicationServer = configureReplicationServer(100, 5000); JEReplicaDB replicaDB = newReplicaDB(replicationServer); replicaDB = newReplicaDB(replicationServer); CSN[] csns = newCSNs(1, 0, 3); @@ -250,6 +255,7 @@ } finally { shutdown(replicaDB); remove(replicationServer); } } @@ -258,7 +264,6 @@ public void testGenerateCursorFrom() throws Exception { ReplicationServer replicationServer = null; DBCursor<UpdateMsg> cursor = null; JEReplicaDB replicaDB = null; try { @@ -266,38 +271,69 @@ replicationServer = configureReplicationServer(100000, 10); replicaDB = newReplicaDB(replicationServer); CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6); for (int i = 0; i < 5; i++) final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5); final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns)); csns2.remove(csns[3]); for (CSN csn : csns2) { if (i != 3) { replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); } replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); } cursor = replicaDB.generateCursorFrom(csns[0]); assertTrue(cursor.next()); assertEquals(cursor.getRecord().getCSN(), csns[1]); StaticUtils.close(cursor); for (CSN csn : csns2) { assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn); } assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]); cursor = replicaDB.generateCursorFrom(csns[3]); assertTrue(cursor.next()); assertEquals(cursor.getRecord().getCSN(), csns[4]); StaticUtils.close(cursor); cursor = replicaDB.generateCursorFrom(csns[4]); assertFalse(cursor.next()); assertNull(cursor.getRecord()); for (int i = 0; i < csns2.size() - 1; i++) { assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1)); } assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); } finally { StaticUtils.close(cursor); if (replicaDB != null) replicaDB.shutdown(); shutdown(replicaDB); remove(replicationServer); } } private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN, final PositionStrategy positionStrategy, final CSN expectedCSN) throws ChangelogException { DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); try { final SoftAssertions softly = new SoftAssertions(); softly.assertThat(cursor.next()).isTrue(); softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN); softly.assertAll(); } finally { close(cursor); } } private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN, final PositionStrategy positionStrategy) throws ChangelogException { DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); try { final SoftAssertions softly = new SoftAssertions(); softly.assertThat(cursor.next()).isFalse(); softly.assertThat(cursor.getRecord()).isNull(); softly.assertAll(); } finally { close(cursor); } } /** * Test the logic that manages counter records in the JEReplicaDB in order to * optimize the oldest and newest records in the replication changelog db. @@ -395,13 +431,22 @@ } finally { if (replicaDB != null) replicaDB.shutdown(); shutdown(replicaDB); if (dbEnv != null) { dbEnv.shutdown(); } remove(replicationServer); TestCaseUtils.deleteDirectory(testRoot); } } private void shutdown(JEReplicaDB replicaDB) { if (replicaDB != null) { replicaDB.shutdown(); } } }