mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
01.57.2014 2da62bc07d8ec56f15adb2c4128bf02fabb3885c
opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
@@ -32,15 +32,23 @@
 * anymore, a cursor must be closed to release all the resources into the
 * database.
 * <p>
 * Here is a typical usage pattern:
 *
 * The cursor provides a java.sql.ResultSet like API : it is positioned before
 * the first requested record and needs to be moved forward by calling
 * {@link DBCursor#next()}.
 * <p>
 * Usage:
 * <pre>
 * DBCursor&lt;T&gt; cursor = ...;         // obtain a new cursor,
 *                                   // already initialized
 * T record1 = cursor.getRecord();   // get the first record
 * while (cursor.next()) {           // advance to the next record
 *   T record = cursor.getRecord();  // get the next record
 *   ...                             // etc.
 * {@code
 *  DBCursor cursor = ...;
 *  try {
 *    while (cursor.next()) {
 *      Record record = cursor.getRecord();
 *      // ... can call cursor.getRecord() again: it will return the same result
 *    }
 *  }
 *  finally {
 *    close(cursor);
 *  }
 * }
 * </pre>
 *
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -96,8 +96,6 @@
   * replication domain starting after the provided {@link ServerState} for each
   * replicaDBs.
   * <p>
   * The cursor is already advanced to the records after the serverState.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
@@ -119,8 +117,6 @@
   * Generates a {@link DBCursor} for one replicaDB for the specified
   * replication domain and serverId starting after the provided {@link CSN}.
   * <p>
   * The cursor is already advanced to the records after the CSN.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -639,6 +639,7 @@
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
@@ -673,9 +674,7 @@
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
      cursor.next();
      return cursor;
      return replicaDB.generateCursorFrom(startAfterCSN);
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -206,10 +206,6 @@
   * Returns a cursor that allows to retrieve the update messages from this DB,
   * starting at the position defined by the smallest CSN that is strictly
   * higher than the provided CSN.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @param startAfterCSN
   *          The position where the cursor must start. If null, start from the
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -423,10 +423,6 @@
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
@@ -461,10 +457,6 @@
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the provided key.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -482,11 +474,6 @@
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the smallest key that is higher than
   * the provided key.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method. After the first call to {@code cursor.next()}
   * the cursor points to the record corresponding to the key found.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -942,7 +929,9 @@
  }
  /**
   * Represents a cursor than can be repositioned on a given key.
   * Represents a DB Cursor than can be repositioned on a given key.
   * <p>
   * Note that as a DBCursor, it provides a java.sql.ResultSet like API.
   */
  static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
  {
@@ -968,10 +957,6 @@
  /**
   * Implements a cursor on the log.
   * <p>
   * The cursor initially points to a record, that is {@code cursor.getRecord()}
   * is equals to the first record available from the cursor before any call to
   * {@code cursor.next()} method.
   * <p>
   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
   * <p>
   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -312,10 +312,6 @@
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
@@ -329,11 +325,6 @@
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the provided key.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method. After the first call to {@code cursor.next()}
   * the cursor points to the record corresponding to the key.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -351,11 +342,6 @@
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the smallest key that is higher than
   * the provided key.
   * <p>
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method. After the first call to {@code cursor.next()}
   * the cursor points to the record corresponding to the key found.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -571,13 +557,7 @@
    return logfile.equals(other.logfile);
  }
  /**
   * Implements a repositionable cursor on the log file.
   * <p>
   * The cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method.
   */
  /** Implements a repositionable cursor on the log file. */
  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
    /** The underlying log on which entries are read. */
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -483,6 +483,7 @@
    {
      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      cursor.next();
      map.put(serverId, cursor);
      return false;
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -714,6 +714,7 @@
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
@@ -748,9 +749,7 @@
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
      cursor.next();
      return cursor;
      return replicaDB.generateCursorFrom(startAfterCSN);
    }
    return EMPTY_CURSOR;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -630,10 +630,8 @@
        final CSN csn = newestMsg.getCSN();
        when(cnIndexDB.getNewestRecord()).thenReturn(
            new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
        final SequentialDBCursor cursor =
            cursors.get(Pair.of(baseDN, csn.getServerId()));
        final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
        cursor.add(newestMsg);
        cursor.next(); // simulate the cursor had been initialized with this change
      }
      initialCookie.update(msg.getBaseDN(), msg.getCSN());
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -29,6 +29,7 @@
import java.util.Map;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -49,6 +50,8 @@
  private UpdateMsg msg2;
  private UpdateMsg msg3;
  private UpdateMsg msg4;
  private UpdateMsg msg5;
  private UpdateMsg msg6;
  private String baseDN1 = "dc=forgerock,dc=com";
  private String baseDN2 = "dc=example,dc=com";
@@ -59,6 +62,8 @@
    msg2 = new FakeUpdateMsg(2);
    msg3 = new FakeUpdateMsg(3);
    msg4 = new FakeUpdateMsg(4);
    msg5 = new FakeUpdateMsg(5);
    msg6 = new FakeUpdateMsg(6);
  }
  @Test
@@ -88,6 +93,17 @@
  }
  @Test
  public void threeElementsCursor() throws Exception
  {
    final CompositeDBCursor<String> compCursor =
        newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2, msg3), baseDN1));
    assertInOrder(compCursor,
        of(msg1, baseDN1),
        of(msg2, baseDN1),
        of(msg3, baseDN1));
  }
  @Test
  public void twoEmptyCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -121,7 +137,32 @@
  }
  @Test
  public void recycleTwoElementCursors() throws Exception
  public void twoThreeElementCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, msg3, msg6), baseDN1),
        of(new SequentialDBCursor(msg1, msg4, msg5), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2),
        of(msg5, baseDN2),
        of(msg6, baseDN1));
  }
  @Test
  public void recycleTwoElementsCursor() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg1, null, msg2), baseDN1));
    assertNextRecord(compCursor, of(msg1, baseDN1));
    assertFalse(compCursor.next());
    assertNextRecord(compCursor, of(msg2, baseDN1));
  }
  @Test
  public void recycleTwoElementsCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
@@ -133,6 +174,50 @@
        of(msg4, baseDN1));
  }
  // TODO : this test fails because msg2 is returned twice
  @Test(enabled=false)
  public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(null, null, msg1), baseDN1),
        of(new SequentialDBCursor(msg2, msg3, msg4), baseDN2));
    assertInOrder(compCursor,
        of(msg2, baseDN2),
        of(msg1, baseDN1),
        of(msg3, baseDN2),
        of(msg4, baseDN2));
  }
  @Test
  public void recycleThreeElementsCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
        of(new SequentialDBCursor(null, msg1, null, msg4, msg5), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2),
        of(msg5, baseDN2),
        of(msg6, baseDN1));
  }
  @Test
  public void recycleThreeElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
        of(new SequentialDBCursor(null, msg1, null, null, msg4, msg5), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2),
        of(msg5, baseDN2),
        of(msg6, baseDN1));
  }
  private CompositeDBCursor<String> newCompositeDBCursor(
      Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
  {
@@ -140,6 +225,9 @@
        new HashMap<DBCursor<UpdateMsg>, String>();
    for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
    {
      // The cursors in the composite are expected to be pointing
      // to first record available
      pair.getFirst().next();
      cursorsMap.put(pair.getFirst(), pair.getSecond());
    }
    return new CompositeDBCursor<String>(cursorsMap, true);
@@ -148,13 +236,26 @@
  private void assertInOrder(final CompositeDBCursor<String> compCursor,
      Pair<UpdateMsg, String>... expecteds) throws ChangelogException
  {
    for (final Pair<UpdateMsg, String> expected : expecteds)
    for (int i = 0; i < expecteds.length ; i++)
    {
      assertTrue(compCursor.next());
      assertSame(compCursor.getRecord(), expected.getFirst());
      assertSame(compCursor.getData(), expected.getSecond());
      final Pair<UpdateMsg, String> expected = expecteds[i];
      final String index = " at element i=" + i;
      assertTrue(compCursor.next(), index);
      assertSame(compCursor.getRecord(), expected.getFirst(), index);
      assertSame(compCursor.getData(), expected.getSecond(), index);
    }
    assertFalse(compCursor.next());
    assertNull(compCursor.getRecord());
    assertNull(compCursor.getData());
    compCursor.close();
  }
  private void assertNextRecord(final CompositeDBCursor<String> compCursor,
      Pair<UpdateMsg, String> expected) throws ChangelogException
  {
    assertTrue(compCursor.next());
    assertSame(compCursor.getRecord(), expected.getFirst());
    assertSame(compCursor.getData(), expected.getSecond());
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -65,7 +65,7 @@
  public void cursorReturnsTrue() throws Exception
  {
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
@@ -95,7 +95,7 @@
  public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
  {
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
@@ -116,7 +116,7 @@
    final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
    assertThat(cursor.getRecord()).isNull();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -39,10 +39,16 @@
  private final List<UpdateMsg> msgs;
  private UpdateMsg current;
  /**
   * A cursor built from a list of update messages.
   * <p>
   * This cursor provides a java.sql.ResultSet-like API to be consistent with the
   * {@code DBCursor} API : it is positioned before the first requested record
   * and needs to be moved forward by calling {@link DBCursor#next()}.
   */
  public SequentialDBCursor(UpdateMsg... msgs)
  {
    this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
    next();
  }
  public void add(UpdateMsg msg)