mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
19.06.2014 174074dccd4a4731728bfb274ed982534c45fde1
Forward port of checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend 
to support cn=changelog
CR-4053

Preparatory work for the changelog backend: add a new behavior to cursors, in order to be
able to start cursor at a given key instead of starting just after.
In order to avoid introducing another boolean in the methods, I created two enums to
define the behavior of a cursor.

* DBCursor class : add two enums KeyMatchingStrategy and PositionStrategy
** The new behavior corresponds to PositionStrategy.ON_MATCHING_KEY, which allow
to position on the record with the given key (while AFTER_MATCHING_KEY position
just after the record with the given key).

* ReplicationDomainDB : add PositionStrategy argument for all methods that returns a cursor

* ReplicationServerDomain : getCursorFrom(DN, ServerState) method calls underlying method
with PositionStrategy.AFTER_MATCHING_KEY

* DomainDBCursor, MultiDomainDBCursor : add PositionStrategy argument to the constructor,
pass the strategy to underlying cursors

* ChangeNumberIndexer : use AFTER_MATCHING_KEY strategy when retrieving the
cursor (no behavior change)

* JEChangelogDB : add PositionStrategy argument for all methods that returns a cursor,
but getCursorFrom(DN, int, CSN, PositionStrategy) method does NOT implement the
new ON_MATCHING_KEY strategy.

* Update of tests classes to match method signature but no new tests added for the
new behavior (ON_MATCHING_KEY) - to be done later
9 files modified
132 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 5 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java 23 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 32 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 3 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java 13 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 32 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java 10 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 13 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 1 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -53,6 +53,7 @@
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -1347,12 +1348,12 @@
   * @return a non null {@link DBCursor} going from oldest to newest CSN
   * @throws ChangelogException
   *           If a database problem happened
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
   */
  public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
      throws ChangelogException
  {
    return domainDB.getCursorFrom(baseDN, startAfterServerState);
    return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -60,6 +60,29 @@
{
  /**
   * Represents a cursor key matching strategy, which allow to choose if only
   * the exact key must be found or if any key equals or higher should match.
   */
  public enum KeyMatchingStrategy {
    /** matches only if the exact key is found. */
    EQUAL_TO_KEY,
    /** matches if the key or a greater key is found. */
    GREATER_THAN_OR_EQUAL_TO_KEY
  }
  /**
   * Represents a cursor positioning strategy, which allow to choose if the start point
   * corresponds to the record at the provided key or the record just after the provided
   * key.
   */
  public enum PositionStrategy {
    /** start point is on the matching key. */
    ON_MATCHING_KEY,
    /** start point is after the matching key. */
    AFTER_MATCHING_KEY
  }
  /**
   * Getter for the current record.
   *
   * @return The current record.
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -29,6 +29,7 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
@@ -92,30 +93,33 @@
  void removeDomain(DN baseDN) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting after the
   * Generates a {@link DBCursor} across all the domains starting at or after the
   * provided {@link MultiDomainServerState} for each domain.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startAfterState
   * @param startState
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState)
   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  // serverId methods
  /**
   * Generates a {@link DBCursor} across all the replicaDBs for the specified
   * replication domain starting after the provided {@link ServerState} for each
   * replication domain starting at or after the provided {@link ServerState} for each
   * replicaDBs.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
@@ -124,21 +128,24 @@
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param startAfterState
   * @param startState
   *          Starting point for each ReplicaDB cursor. If any CSN for a
   *          replicaDB is null, then start from the oldest CSN for this
   *          replicaDB
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, int, CSN)
   * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  /**
   * Generates a {@link DBCursor} for one replicaDB for the specified
   * replication domain and serverId starting after the provided {@link CSN}.
   * replication domain and serverId starting at or after the provided {@link CSN}.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -148,14 +155,17 @@
   *          the replication domain baseDN of the replicaDB
   * @param serverId
   *          the serverId of the replicaDB
   * @param startAfterCSN
   * @param startCSN
   *          Starting point for the ReplicaDB cursor. If the CSN is null, then
   *          start from the oldest CSN for this replicaDB
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
      throws ChangelogException;
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -42,6 +42,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -335,7 +336,7 @@
      }
    }
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -54,6 +54,8 @@
   */
  private static final CSN NULL_CSN = new CSN(0, 0, 0);
  private final PositionStrategy positionStrategy;
  /**
   * Builds a DomainDBCursor instance.
   *
@@ -61,11 +63,15 @@
   *          the replication domain baseDN of this cursor
   * @param domainDB
   *          the DB for the provided replication domain
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   */
  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
  {
    this.baseDN = baseDN;
    this.domainDB = domainDB;
    this.positionStrategy = positionStrategy;
  }
  /**
@@ -102,8 +108,9 @@
      final Entry<Integer, CSN> pair = iter.next();
      final int serverId = pair.getKey();
      final CSN csn = pair.getValue();
      final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
      final DBCursor<UpdateMsg> cursor =
          domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
      addCursor(cursor, null);
      iter.remove();
    }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -36,6 +36,7 @@
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.util.Reject;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
@@ -46,6 +47,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
@@ -696,37 +698,38 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
    }
    return cursor;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
@@ -753,15 +756,18 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
        + " is not supported for the JE implementation fo changelog");
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
          replicaDB.generateCursorFrom(startCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -50,15 +50,21 @@
  private final ConcurrentSkipListSet<DN> removeDomains =
      new ConcurrentSkipListSet<DN>();
  private final PositionStrategy positionStrategy;
  /**
   * Builds a MultiDomainDBCursor instance.
   *
   * @param domainDB
   *          the replication domain management DB
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   */
  public MultiDomainDBCursor(ReplicationDomainDB domainDB)
  public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
  {
    this.domainDB = domainDB;
    this.positionStrategy = positionStrategy;
  }
  /**
@@ -86,7 +92,7 @@
      final Entry<DN, ServerState> entry = iter.next();
      final DN baseDN = entry.getKey();
      final ServerState serverState = entry.getValue();
      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
      addCursor(domainDBCursor, baseDN);
      iter.remove();
    }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -54,6 +54,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
 * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
@@ -158,7 +159,7 @@
  {
    MockitoAnnotations.initMocks(this);
    multiDomainCursor = new MultiDomainDBCursor(domainDB);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
    initialState = new ChangelogState();
    initialCookie = new MultiDomainServerState();
    replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
@@ -167,8 +168,8 @@
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
        multiDomainCursor);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
      .thenReturn(multiDomainCursor);
  }
  @AfterMethod
@@ -596,15 +597,15 @@
      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
      if (domainDBCursor == null)
      {
        domainDBCursor = new DomainDBCursor(baseDN, domainDB);
        domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
        domainDBCursors.put(baseDN, domainDBCursor);
        multiDomainCursor.addDomain(baseDN, null);
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
            .thenReturn(domainDBCursor);
      }
      domainDBCursor.addReplicaDB(serverId, null);
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
          .thenReturn(replicaDBCursor);
    }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -187,6 +187,7 @@
        of(msg4, baseDN1));
  }
  @Test
  public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(