mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
18.00.2014 3ee70c76e7e5c1cfb8c5ce16bc3f50f2306bdee7
Forward port of checkpoint commit for OPENDJ-1471 File based changelog : improve cursor behavior

All cursors related to ReplicaDB now behave like java.sql.ResultSet API in JE implementation

(File based changelog implementation will be ported later)

* JEChangelogDB#getCursorFrom(DN, int, CSN) now returns a java.sql.ResultSet style cursor

* ChangeNumberIndex : advance cursor to first record to keep current behavior

* Update Javadoc in several classes : DBCursor, ReplicationDomainDB, JEReplicaDB,
JEReplicaDBCursor

* SequentialDBCursor (used in tests only) now behaves like java.sql.ResultSet API

* ChangeNumberIndexerTest : adapt to changes of SequentialDBCursor

* CompositeDBCursorTest : add more tests

* ReplicaOfflineCursorTest : adapt to changes of SequentialDBCursor
8 files modified
164 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java 26 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 4 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 1 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 5 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 4 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 110 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java 6 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java 8 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
@@ -32,15 +32,23 @@
 * anymore, a cursor must be closed to release all the resources into the
 * database.
 * <p>
 * Here is a typical usage pattern:
 *
 * The cursor provides a java.sql.ResultSet like API : it is positioned before
 * the first requested record and needs to be moved forward by calling
 * {@link DBCursor#next()}.
 * <p>
 * Usage:
 * <pre>
 * DBCursor&lt;T&gt; cursor = ...;         // obtain a new cursor,
 *                                   // already initialized
 * T record1 = cursor.getRecord();   // get the first record
 * while (cursor.next()) {           // advance to the next record
 *   T record = cursor.getRecord();  // get the next record
 *   ...                             // etc.
 * {@code
 *  DBCursor cursor = ...;
 *  try {
 *    while (cursor.next()) {
 *      Record record = cursor.getRecord();
 *      // ... can call cursor.getRecord() again: it will return the same result
 *    }
 *  }
 *  finally {
 *    close(cursor);
 *  }
 * }
 * </pre>
 *
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -96,8 +96,6 @@
   * replication domain starting after the provided {@link ServerState} for each
   * replicaDBs.
   * <p>
   * The cursor is already advanced to the records after the serverState.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
@@ -119,8 +117,6 @@
   * Generates a {@link DBCursor} for one replicaDB for the specified
   * replication domain and serverId starting after the provided {@link CSN}.
   * <p>
   * The cursor is already advanced to the records after the CSN.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -463,6 +463,7 @@
    {
      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      cursor.next();
      map.put(serverId, cursor);
      return false;
    }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -705,6 +705,7 @@
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
@@ -739,9 +740,7 @@
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
      cursor.next();
      return cursor;
      return replicaDB.generateCursorFrom(startAfterCSN);
    }
    return EMPTY_CURSOR;
  }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -631,10 +631,8 @@
        final CSN csn = newestMsg.getCSN();
        when(cnIndexDB.getNewestRecord()).thenReturn(
            new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
        final SequentialDBCursor cursor =
            cursors.get(Pair.of(baseDN, csn.getServerId()));
        final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
        cursor.add(newestMsg);
        cursor.next(); // simulate the cursor had been initialized with this change
      }
      initialCookie.update(msg.getBaseDN(), msg.getCSN());
    }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -49,6 +49,8 @@
  private UpdateMsg msg2;
  private UpdateMsg msg3;
  private UpdateMsg msg4;
  private UpdateMsg msg5;
  private UpdateMsg msg6;
  private String baseDN1 = "dc=forgerock,dc=com";
  private String baseDN2 = "dc=example,dc=com";
@@ -59,6 +61,8 @@
    msg2 = new FakeUpdateMsg(2);
    msg3 = new FakeUpdateMsg(3);
    msg4 = new FakeUpdateMsg(4);
    msg5 = new FakeUpdateMsg(5);
    msg6 = new FakeUpdateMsg(6);
  }
  @Test
@@ -88,6 +92,17 @@
  }
  @Test
  public void threeElementsCursor() throws Exception
  {
    final CompositeDBCursor<String> compCursor =
        newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2, msg3), baseDN1));
    assertInOrder(compCursor,
        of(msg1, baseDN1),
        of(msg2, baseDN1),
        of(msg3, baseDN1));
  }
  @Test
  public void twoEmptyCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -121,7 +136,32 @@
  }
  @Test
  public void recycleTwoElementCursors() throws Exception
  public void twoThreeElementCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, msg3, msg6), baseDN1),
        of(new SequentialDBCursor(msg1, msg4, msg5), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2),
        of(msg5, baseDN2),
        of(msg6, baseDN1));
  }
  @Test
  public void recycleTwoElementsCursor() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg1, null, msg2), baseDN1));
    assertNextRecord(compCursor, of(msg1, baseDN1));
    assertFalse(compCursor.next());
    assertNextRecord(compCursor, of(msg2, baseDN1));
  }
  @Test
  public void recycleTwoElementsCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
@@ -133,6 +173,50 @@
        of(msg4, baseDN1));
  }
  // TODO : this test fails because msg2 is returned twice
  @Test(enabled=false)
  public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(null, null, msg1), baseDN1),
        of(new SequentialDBCursor(msg2, msg3, msg4), baseDN2));
    assertInOrder(compCursor,
        of(msg2, baseDN2),
        of(msg1, baseDN1),
        of(msg3, baseDN2),
        of(msg4, baseDN2));
  }
  @Test
  public void recycleThreeElementsCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
        of(new SequentialDBCursor(null, msg1, null, msg4, msg5), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2),
        of(msg5, baseDN2),
        of(msg6, baseDN1));
  }
  @Test
  public void recycleThreeElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
        of(new SequentialDBCursor(null, msg1, null, null, msg4, msg5), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2),
        of(msg5, baseDN2),
        of(msg6, baseDN1));
  }
  private CompositeDBCursor<String> newCompositeDBCursor(
      Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
  {
@@ -140,6 +224,9 @@
        new HashMap<DBCursor<UpdateMsg>, String>();
    for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
    {
      // The cursors in the composite are expected to be pointing
      // to first record available
      pair.getFirst().next();
      cursorsMap.put(pair.getFirst(), pair.getSecond());
    }
    return new CompositeDBCursor<String>(cursorsMap, true);
@@ -148,13 +235,26 @@
  private void assertInOrder(final CompositeDBCursor<String> compCursor,
      Pair<UpdateMsg, String>... expecteds) throws ChangelogException
  {
    for (final Pair<UpdateMsg, String> expected : expecteds)
    for (int i = 0; i < expecteds.length ; i++)
    {
      final Pair<UpdateMsg, String> expected = expecteds[i];
      final String index = " at element i=" + i;
      assertTrue(compCursor.next(), index);
      assertSame(compCursor.getRecord(), expected.getFirst(), index);
      assertSame(compCursor.getData(), expected.getSecond(), index);
    }
    assertFalse(compCursor.next());
    assertNull(compCursor.getRecord());
    assertNull(compCursor.getData());
    compCursor.close();
  }
  private void assertNextRecord(final CompositeDBCursor<String> compCursor,
      Pair<UpdateMsg, String> expected) throws ChangelogException
    {
      assertTrue(compCursor.next());
      assertSame(compCursor.getRecord(), expected.getFirst());
      assertSame(compCursor.getData(), expected.getSecond());
    }
    assertFalse(compCursor.next());
    compCursor.close();
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -65,7 +65,7 @@
  public void cursorReturnsTrue() throws Exception
  {
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
@@ -95,7 +95,7 @@
  public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
  {
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
@@ -116,7 +116,7 @@
    final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
    assertThat(cursor.getRecord()).isNull();
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -39,10 +39,16 @@
  private final List<UpdateMsg> msgs;
  private UpdateMsg current;
  /**
   * A cursor built from a list of update messages.
   * <p>
   * This cursor provides a java.sql.ResultSet-like API to be consistent with the
   * {@code DBCursor} API : it is positioned before the first requested record
   * and needs to be moved forward by calling {@link DBCursor#next()}.
   */
  public SequentialDBCursor(UpdateMsg... msgs)
  {
    this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
    next();
  }
  public void add(UpdateMsg msg)