| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | |
| | | * anymore, a cursor must be closed to release all the resources into the |
| | | * database. |
| | | * <p> |
| | | * Here is a typical usage pattern: |
| | | * |
| | | * The cursor provides a java.sql.ResultSet like API : it is positioned before |
| | | * the first requested record and needs to be moved forward by calling |
| | | * {@link DBCursor#next()}. |
| | | * <p> |
| | | * Usage: |
| | | * <pre> |
| | | * DBCursor<T> cursor = ...; // obtain a new cursor, |
| | | * // already initialized |
| | | * T record1 = cursor.getRecord(); // get the first record |
| | | * while (cursor.next()) { // advance to the next record |
| | | * T record = cursor.getRecord(); // get the next record |
| | | * ... // etc. |
| | | * {@code |
| | | * DBCursor cursor = ...; |
| | | * try { |
| | | * while (cursor.next()) { |
| | | * Record record = cursor.getRecord(); |
| | | * // ... can call cursor.getRecord() again: it will return the same result |
| | | * } |
| | | * } |
| | | * finally { |
| | | * close(cursor); |
| | | * } |
| | | * } |
| | | * </pre> |
| | | * |
| | |
| | | * replication domain starting after the provided {@link ServerState} for each |
| | | * replicaDBs. |
| | | * <p> |
| | | * The cursor is already advanced to the records after the serverState. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | | * replication domain and serverId starting after the provided {@link CSN}. |
| | | * <p> |
| | | * The cursor is already advanced to the records after the CSN. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | } |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | |
| | | * Returns a cursor that allows to retrieve the update messages from this DB, |
| | | * starting at the position defined by the smallest CSN that is strictly |
| | | * higher than the provided CSN. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals {@code null} before any call to |
| | | * {@code cursor.next()} method. |
| | | * |
| | | * @param startAfterCSN |
| | | * The position where the cursor must start. If null, start from the |
| | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the first position. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals to {@code null} before any call to |
| | | * {@code cursor.next()} method. |
| | | * |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the provided key. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals to {@code null} before any call to |
| | | * {@code cursor.next()} method. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the smallest key that is higher than |
| | | * the provided key. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals to {@code null} before any call to |
| | | * {@code cursor.next()} method. After the first call to {@code cursor.next()} |
| | | * the cursor points to the record corresponding to the key found. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | |
| | | } |
| | | |
| | | /** |
| | | * Represents a cursor than can be repositioned on a given key. |
| | | * Represents a DB Cursor than can be repositioned on a given key. |
| | | * <p> |
| | | * Note that as a DBCursor, it provides a java.sql.ResultSet like API. |
| | | */ |
| | | static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>> |
| | | { |
| | |
| | | /** |
| | | * Implements a cursor on the log. |
| | | * <p> |
| | | * The cursor initially points to a record, that is {@code cursor.getRecord()} |
| | | * is equals to the first record available from the cursor before any call to |
| | | * {@code cursor.next()} method. |
| | | * <p> |
| | | * The cursor uses the log shared lock to ensure reads are not done during a rotation. |
| | | * <p> |
| | | * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()} |
| | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the first position. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals {@code null} before any call to |
| | | * {@code cursor.next()} method. |
| | | * |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the provided key. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals to {@code null} before any call to |
| | | * {@code cursor.next()} method. After the first call to {@code cursor.next()} |
| | | * the cursor points to the record corresponding to the key. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the smallest key that is higher than |
| | | * the provided key. |
| | | * <p> |
| | | * The returned cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals to {@code null} before any call to |
| | | * {@code cursor.next()} method. After the first call to {@code cursor.next()} |
| | | * the cursor points to the record corresponding to the key found. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | |
| | | return logfile.equals(other.logfile); |
| | | } |
| | | |
| | | /** |
| | | * Implements a repositionable cursor on the log file. |
| | | * <p> |
| | | * The cursor initially points to no record, that is |
| | | * {@code cursor.getRecord()} is equals to {@code null} before any call to |
| | | * {@code cursor.next()} method. |
| | | */ |
| | | /** Implements a repositionable cursor on the log file. */ |
| | | static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V> |
| | | { |
| | | /** The underlying log on which entries are read. */ |
| | |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | cursor.next(); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | } |
| | | return EMPTY_CURSOR; |
| | | } |
| | |
| | | final CSN csn = newestMsg.getCSN(); |
| | | when(cnIndexDB.getNewestRecord()).thenReturn( |
| | | new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn)); |
| | | final SequentialDBCursor cursor = |
| | | cursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | cursor.add(newestMsg); |
| | | cursor.next(); // simulate the cursor had been initialized with this change |
| | | } |
| | | initialCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | | } |
| | |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | |
| | | private UpdateMsg msg2; |
| | | private UpdateMsg msg3; |
| | | private UpdateMsg msg4; |
| | | private UpdateMsg msg5; |
| | | private UpdateMsg msg6; |
| | | private String baseDN1 = "dc=forgerock,dc=com"; |
| | | private String baseDN2 = "dc=example,dc=com"; |
| | | |
| | |
| | | msg2 = new FakeUpdateMsg(2); |
| | | msg3 = new FakeUpdateMsg(3); |
| | | msg4 = new FakeUpdateMsg(4); |
| | | msg5 = new FakeUpdateMsg(5); |
| | | msg6 = new FakeUpdateMsg(6); |
| | | } |
| | | |
| | | @Test |
| | |
| | | } |
| | | |
| | | @Test |
| | | public void threeElementsCursor() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = |
| | | newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2, msg3), baseDN1)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN1), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void twoEmptyCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementCursors() throws Exception |
| | | public void twoThreeElementCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, msg3, msg6), baseDN1), |
| | | of(new SequentialDBCursor(msg1, msg4, msg5), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2), |
| | | of(msg5, baseDN2), |
| | | of(msg6, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementsCursor() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg1, null, msg2), baseDN1)); |
| | | assertNextRecord(compCursor, of(msg1, baseDN1)); |
| | | assertFalse(compCursor.next()); |
| | | assertNextRecord(compCursor, of(msg2, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementsCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, null, msg4), baseDN1), |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | // TODO : this test fails because msg2 is returned twice |
| | | @Test(enabled=false) |
| | | public void recycleTwoElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(null, null, msg1), baseDN1), |
| | | of(new SequentialDBCursor(msg2, msg3, msg4), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg2, baseDN2), |
| | | of(msg1, baseDN1), |
| | | of(msg3, baseDN2), |
| | | of(msg4, baseDN2)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleThreeElementsCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1), |
| | | of(new SequentialDBCursor(null, msg1, null, msg4, msg5), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2), |
| | | of(msg5, baseDN2), |
| | | of(msg6, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleThreeElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1), |
| | | of(new SequentialDBCursor(null, msg1, null, null, msg4, msg5), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2), |
| | | of(msg5, baseDN2), |
| | | of(msg6, baseDN1)); |
| | | } |
| | | |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | | Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception |
| | | { |
| | |
| | | new HashMap<DBCursor<UpdateMsg>, String>(); |
| | | for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs) |
| | | { |
| | | // The cursors in the composite are expected to be pointing |
| | | // to first record available |
| | | pair.getFirst().next(); |
| | | cursorsMap.put(pair.getFirst(), pair.getSecond()); |
| | | } |
| | | return new CompositeDBCursor<String>(cursorsMap, true); |
| | |
| | | private void assertInOrder(final CompositeDBCursor<String> compCursor, |
| | | Pair<UpdateMsg, String>... expecteds) throws ChangelogException |
| | | { |
| | | for (final Pair<UpdateMsg, String> expected : expecteds) |
| | | for (int i = 0; i < expecteds.length ; i++) |
| | | { |
| | | assertTrue(compCursor.next()); |
| | | assertSame(compCursor.getRecord(), expected.getFirst()); |
| | | assertSame(compCursor.getData(), expected.getSecond()); |
| | | final Pair<UpdateMsg, String> expected = expecteds[i]; |
| | | final String index = " at element i=" + i; |
| | | assertTrue(compCursor.next(), index); |
| | | assertSame(compCursor.getRecord(), expected.getFirst(), index); |
| | | assertSame(compCursor.getData(), expected.getSecond(), index); |
| | | } |
| | | assertFalse(compCursor.next()); |
| | | assertNull(compCursor.getRecord()); |
| | | assertNull(compCursor.getData()); |
| | | compCursor.close(); |
| | | } |
| | | |
| | | private void assertNextRecord(final CompositeDBCursor<String> compCursor, |
| | | Pair<UpdateMsg, String> expected) throws ChangelogException |
| | | { |
| | | assertTrue(compCursor.next()); |
| | | assertSame(compCursor.getRecord(), expected.getFirst()); |
| | | assertSame(compCursor.getData(), expected.getSecond()); |
| | | } |
| | | |
| | | } |
| | |
| | | public void cursorReturnsTrue() throws Exception |
| | | { |
| | | final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); |
| | | delegateCursor = new SequentialDBCursor(null, updateMsg); |
| | | delegateCursor = new SequentialDBCursor(updateMsg); |
| | | |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | |
| | | public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception |
| | | { |
| | | final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); |
| | | delegateCursor = new SequentialDBCursor(null, updateMsg); |
| | | delegateCursor = new SequentialDBCursor(updateMsg); |
| | | |
| | | final CSN offlineCSN = new CSN(timestamp++, 1, 1); |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); |
| | |
| | | final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1); |
| | | |
| | | final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); |
| | | delegateCursor = new SequentialDBCursor(null, updateMsg); |
| | | delegateCursor = new SequentialDBCursor(updateMsg); |
| | | |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | |
| | | private final List<UpdateMsg> msgs; |
| | | private UpdateMsg current; |
| | | |
| | | /** |
| | | * A cursor built from a list of update messages. |
| | | * <p> |
| | | * This cursor provides a java.sql.ResultSet-like API to be consistent with the |
| | | * {@code DBCursor} API : it is positioned before the first requested record |
| | | * and needs to be moved forward by calling {@link DBCursor#next()}. |
| | | */ |
| | | public SequentialDBCursor(UpdateMsg... msgs) |
| | | { |
| | | this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs)); |
| | | next(); |
| | | } |
| | | |
| | | public void add(UpdateMsg msg) |