mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.16.2013 4f567bc323d14f10807e50d152acbb25cb5493ad
Renamed:
- ChangelogDB to ChangeNumberIndexDB
- ChangelogDBIterator to ChangeNumberIndexDBCursor
2 files renamed
6 files modified
235 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 58 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 83 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java 8 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 31 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java 24 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,9 +41,7 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogDBIterator;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
@@ -66,8 +64,8 @@
   */
  private String operationId;
  /** Iterator on the changelogDB database. */
  private ChangelogDBIterator changelogDBIter = null;
  /** Cursor on the {@link ChangeNumberIndexDB}. */
  private ChangeNumberIndexDBCursor cnIndexDBCursor;
  private boolean draftCompat = false;
  /**
@@ -564,13 +562,14 @@
  private String findCookie(final int startDraftCN) throws ChangelogException,
      DirectoryException
  {
    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
    final ChangeNumberIndexDB cnIndexDB =
        replicationServer.getChangeNumberIndexDB();
    if (startDraftCN <= 1)
    {
      // Request filter DOES NOT contain any firstDraftCN
      // So we'll generate from the first DraftCN in the DraftCNdb
      if (changelogDB.isEmpty())
      if (cnIndexDB.isEmpty())
      {
        // FIXME JNR if we find a way to make draftCNDb.isEmpty() a non costly
        // operation, then I think we can move this check to the top of this
@@ -579,21 +578,21 @@
        return null;
      }
      final int firstDraftCN = changelogDB.getFirstDraftCN();
      final int firstDraftCN = cnIndexDB.getFirstDraftCN();
      final String crossDomainStartState =
          changelogDB.getPreviousCookie(firstDraftCN);
      changelogDBIter = changelogDB.generateIterator(firstDraftCN);
          cnIndexDB.getPreviousCookie(firstDraftCN);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(firstDraftCN);
      return crossDomainStartState;
    }
    // Request filter DOES contain a startDraftCN
    // Read the draftCNDb to see whether it contains startDraftCN
    String crossDomainStartState = changelogDB.getPreviousCookie(startDraftCN);
    String crossDomainStartState = cnIndexDB.getPreviousCookie(startDraftCN);
    if (crossDomainStartState != null)
    {
      // found the provided startDraftCN, let's return it
      changelogDBIter = changelogDB.generateIterator(startDraftCN);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(startDraftCN);
      return crossDomainStartState;
    }
@@ -612,10 +611,10 @@
    // the DB, let's use the lower limit.
    if (startDraftCN < firstDraftCN)
    {
      crossDomainStartState = changelogDB.getPreviousCookie(firstDraftCN);
      crossDomainStartState = cnIndexDB.getPreviousCookie(firstDraftCN);
      if (crossDomainStartState != null)
      {
        changelogDBIter = changelogDB.generateIterator(firstDraftCN);
        cnIndexDBCursor = cnIndexDB.getCursorFrom(firstDraftCN);
        return crossDomainStartState;
      }
@@ -627,15 +626,15 @@
    {
      // startDraftCN is between first and potential last and has never
      // been returned yet
      if (changelogDB.isEmpty())
      if (cnIndexDB.isEmpty())
      {
        isEndOfDraftCNReached = true;
        return null;
      }
      final int lastKey = changelogDB.getLastDraftCN();
      crossDomainStartState = changelogDB.getPreviousCookie(lastKey);
      changelogDBIter = changelogDB.generateIterator(lastKey);
      final int lastKey = cnIndexDB.getLastDraftCN();
      crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
      return crossDomainStartState;
      // TODO:ECL ... ok we'll start from the end of the draftCNDb BUT ...
@@ -909,10 +908,10 @@
  private void releaseIterator()
  {
    if (this.changelogDBIter != null)
    if (this.cnIndexDBCursor != null)
    {
      this.changelogDBIter.close();
      this.changelogDBIter = null;
      this.cnIndexDBCursor.close();
      this.cnIndexDBCursor = null;
    }
  }
@@ -1371,8 +1370,8 @@
      // the next change from the DraftCN db
      CSN csnFromDraftCNDb = changelogDBIter.getCSN();
      String dnFromDraftCNDb = changelogDBIter.getBaseDN();
      CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN();
      String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate generating draftCN "
@@ -1387,10 +1386,10 @@
      {
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
              + " assigning draftCN=" + changelogDBIter.getDraftCN()
              + " assigning draftCN=" + cnIndexDBCursor.getDraftCN()
              + " to change=" + oldestChange);
        oldestChange.setDraftChangeNumber(changelogDBIter.getDraftCN());
        oldestChange.setDraftChangeNumber(cnIndexDBCursor.getDraftCN());
        return true;
      }
@@ -1419,12 +1418,12 @@
              + " will skip " + csnFromDraftCNDb
              + " and read next change from the DraftCNDb.");
        isEndOfDraftCNReached = !changelogDBIter.next();
        isEndOfDraftCNReached = !cnIndexDBCursor.next();
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
              + " has skipped to " + " sn=" + changelogDBIter.getDraftCN()
              + " csn=" + changelogDBIter.getCSN()
              + " has skipped to " + " sn=" + cnIndexDBCursor.getDraftCN()
              + " csn=" + cnIndexDBCursor.getCSN()
              + " End of draftCNDb ?" + isEndOfDraftCNReached);
      }
      catch (ChangelogException e)
@@ -1454,8 +1453,7 @@
    // store in changelogDB the pair
    // (DraftCN of the current change, state before this change)
    ChangelogDB changelogDB = replicationServer.getChangelogDB();
    changelogDB.add(
    replicationServer.getChangeNumberIndexDB().add(
        change.getDraftChangeNumber(),
        previousCookie.toString(),
        change.getBaseDN(),
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,7 +52,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -146,21 +146,21 @@
  /**
   * The handler of the changelog database, the database stores the relation
   * between a draft change number ('seqnum') and the associated cookie.
   * between a change number and the associated cookie.
   * <p>
   * Guarded by changelogDBLock
   * Guarded by cnIndexDBLock
   */
  private ChangelogDB changelogDB;
  private ChangeNumberIndexDB cnIndexDB;
  /**
   * The last value generated of the draft change number.
   * <p>
   * Guarded by changelogDBLock
   * Guarded by cnIndexDBLock
   **/
  private int lastGeneratedDraftCN = 0;
  /** Used for protecting changelogDB related state. */
  private final Object changelogDBLock = new Object();
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
  /**
   * The tracer object for the debug logger.
@@ -705,11 +705,11 @@
      eclwe.finalizeWorkflowElement();
    }
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      if (changelogDB != null)
      if (cnIndexDB != null)
      {
        changelogDB.shutdown();
        cnIndexDB.shutdown();
      }
    }
  }
@@ -830,7 +830,7 @@
      listenThread.interrupt();
    }
    // shutdown all the ChangelogCaches
    // shutdown all the replication domains
    for (ReplicationServerDomain domain : getReplicationServerDomains())
    {
      domain.shutdown();
@@ -890,13 +890,13 @@
      }
    }
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      if (changelogDB != null)
      if (cnIndexDB != null)
      {
        try
        {
          changelogDB.clear(baseDn);
          cnIndexDB.clear(baseDn);
        }
        catch (Exception ignored)
        {
@@ -908,7 +908,7 @@
        try
        {
          lastGeneratedDraftCN = changelogDB.getLastDraftCN();
          lastGeneratedDraftCN = cnIndexDB.getLastDraftCN();
        }
        catch (Exception ignored)
        {
@@ -1369,13 +1369,13 @@
      rsd.clearDbs();
    }
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      if (changelogDB != null)
      if (cnIndexDB != null)
      {
        try
        {
          changelogDB.clear();
          cnIndexDB.clear();
        }
        catch (Exception ignored)
        {
@@ -1387,7 +1387,7 @@
        try
        {
          changelogDB.shutdown();
          cnIndexDB.shutdown();
        }
        catch (Exception ignored)
        {
@@ -1398,7 +1398,7 @@
        }
        lastGeneratedDraftCN = 0;
        changelogDB = null;
        cnIndexDB = null;
      }
    }
  }
@@ -1623,24 +1623,25 @@
  }
  /**
   * Get (or create) a handler on the ChangelogDB for external changelog.
   * Get (or create) a handler on the {@link ChangeNumberIndexDB} for external
   * changelog.
   *
   * @return the handler.
   * @throws DirectoryException
   *           when needed.
   */
  public ChangelogDB getChangelogDB() throws DirectoryException
  public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
  {
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      try
      {
        if (changelogDB == null)
        if (cnIndexDB == null)
        {
          changelogDB = new DraftCNDbHandler(this, this.dbEnv);
          cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
          lastGeneratedDraftCN = getLastDraftChangeNumber();
        }
        return changelogDB;
        return cnIndexDB;
      }
      catch (Exception e)
      {
@@ -1658,11 +1659,11 @@
   */
  public int getFirstDraftChangeNumber()
  {
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      if (changelogDB != null)
      if (cnIndexDB != null)
      {
        return changelogDB.getFirstDraftCN();
        return cnIndexDB.getFirstDraftCN();
      }
      return 0;
    }
@@ -1674,11 +1675,11 @@
   */
  public int getLastDraftChangeNumber()
  {
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      if (changelogDB != null)
      if (cnIndexDB != null)
      {
        return changelogDB.getLastDraftCN();
        return cnIndexDB.getLastDraftCN();
      }
      return 0;
    }
@@ -1690,7 +1691,7 @@
   */
  public int getNewDraftCN()
  {
    synchronized (changelogDBLock)
    synchronized (cnIndexDBLock)
    {
      return ++lastGeneratedDraftCN;
    }
@@ -1730,10 +1731,9 @@
    int lastDraftCN;
    boolean dbEmpty = false;
    long newestDate = 0;
    final ChangelogDB changelogDB = getChangelogDB();
    final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
    int firstDraftCN = changelogDB.getFirstDraftCN();
    int firstDraftCN = cnIndexDB.getFirstDraftCN();
    Map<String,ServerState> domainsServerStateForLastSeqnum = null;
    CSN csnForLastSeqnum = null;
    String domainForLastSeqnum = null;
@@ -1745,11 +1745,11 @@
    }
    else
    {
      lastDraftCN = changelogDB.getLastDraftCN();
      lastDraftCN = cnIndexDB.getLastDraftCN();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
      String lastSeqnumGenState = changelogDB.getPreviousCookie(lastDraftCN);
      String lastSeqnumGenState = cnIndexDB.getPreviousCookie(lastDraftCN);
      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
      {
        domainsServerStateForLastSeqnum = MultiDomainServerState.
@@ -1757,13 +1757,13 @@
      }
      // Get the CSN associated with the current last DraftCN
      csnForLastSeqnum = changelogDB.getCSN(lastDraftCN);
      csnForLastSeqnum = cnIndexDB.getCSN(lastDraftCN);
      // Get the domain associated with the current last DraftCN
      domainForLastSeqnum = changelogDB.getBaseDN(lastDraftCN);
      domainForLastSeqnum = cnIndexDB.getBaseDN(lastDraftCN);
    }
    // Domain by domain
    long newestDate = 0;
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      if (contains(excludedBaseDNs, rsd.getBaseDn()))
@@ -1810,6 +1810,7 @@
      if ((ec>0) && (firstDraftCN==0))
        firstDraftCN = 1;
    }
    if (dbEmpty)
    {
      // The database was empty, just keep increasing numbers since last time
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -29,13 +29,18 @@
import org.opends.server.replication.common.CSN;
/**
 * This class stores the changelog information into a database.
 * This class stores an index of all the changes seen by this server. The index
 * is sorted by a global ordering as defined in the CSN class. The index links a
 * <code>changeNumber</code> to the corresponding {@link CSN}. The {@link CSN}
 * then links to a corresponding change in one of the {@link ReplicaDB}s.
 *
 * @see <a href=
 * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
 * >OpenDJ Domain Names</a> for more information about the changelog.
 * @see <a href= "http://tools.ietf.org/html/draft-good-ldap-changelog-04"
 * >OpenDJ Domain Names</a> for more information about the changeNumber.
 */
public interface ChangelogDB extends Runnable
public interface ChangeNumberIndexDB extends Runnable
{
  /**
@@ -98,9 +103,9 @@
  void add(int draftCN, String previousCookie, String baseDN, CSN csn);
  /**
   * Generate a new {@link ChangelogDBIterator} that allows to browse the db
   * managed by this dbHandler and starting at the position defined by a given
   * changeNumber.
   * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
   * db managed by this dbHandler and starting at the position defined by a
   * given changeNumber.
   *
   * @param startDraftCN
   *          The position where the iterator must start.
@@ -110,7 +115,7 @@
   * @throws ChangelogException
   *           if a database problem happened.
   */
  ChangelogDBIterator generateIterator(int startDraftCN)
  ChangeNumberIndexDBCursor getCursorFrom(int startDraftCN)
      throws ChangelogException;
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDBIterator.java
@@ -35,7 +35,7 @@
 * ChangelogDBIterator must be closed to release all the resources into the
 * database.
 */
public interface ChangelogDBIterator extends Closeable
public interface ChangeNumberIndexDBCursor extends Closeable
{
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -44,9 +44,7 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogDBIterator;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -62,13 +60,13 @@
 * server in the topology.
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * This class is also able to generate a {@link ChangelogDBIterator} that can be
 * used to read all changes from a given draft ChangeNumber.
 * This class is also able to generate a {@link ChangeNumberIndexDBCursor} that
 * can be used to read all changes from a given draft ChangeNumber.
 * <p>
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
public class DraftCNDbHandler implements ChangelogDB
public class DraftCNDbHandler implements ChangeNumberIndexDB
{
  /**
   * The tracer object for the debug logger.
@@ -217,7 +215,7 @@
  /** {@inheritDoc} */
  @Override
  public ChangelogDBIterator generateIterator(int startDraftCN)
  public ChangeNumberIndexDBCursor getCursorFrom(int startDraftCN)
      throws ChangelogException
  {
    return new DraftCNDbIterator(db, startDraftCN);
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
@@ -30,10 +30,8 @@
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ChangelogDBIterator;
import org.opends.server.replication.server.changelog.je.DraftCNDB
    .DraftCNDBCursor;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.DebugLogLevel;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -42,7 +40,7 @@
 * This class allows to iterate through the changes received from a given
 * LDAP Server Identifier.
 */
public class DraftCNDbIterator implements ChangelogDBIterator
public class DraftCNDbIterator implements ChangeNumberIndexDBCursor
{
  private static final DebugTracer TRACER = getTracer();
  private DraftCNDBCursor draftCNDbCursor;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -1656,7 +1656,7 @@
        LDAPFilter.decode(filterString),
        attributes);
  }
  /**
   * Test parallel simultaneous psearch with different filters.
   */
@@ -2678,22 +2678,23 @@
  {
    String tn = "ECLPurgeDraftCNDbAfterChangelogClear";
    debugInfo(tn, "Starting test\n\n");
    DraftCNDbHandler draftdb =
        (DraftCNDbHandler) replicationServer.getChangeNumberIndexDB();
    assertEquals(draftdb.count(), 8);
    draftdb.setPurgeDelay(1000);
    // Now clear the changelog db
    this.replicationServer.clearDb();
    // Expect changes purged from the changelog db to be sometimes
    // also purged from the DraftCNDb.
    while (!draftdb.isEmpty())
    {
      DraftCNDbHandler draftdb = (DraftCNDbHandler) replicationServer.getChangelogDB();
      assertEquals(draftdb.count(), 8);
      draftdb.setPurgeDelay(1000);
      // Now Purge the changelog db
      this.replicationServer.clearDb();
      // Expect changes purged from the changelog db to be sometimes
      // also purged from the DraftCNDb.
      while (!draftdb.isEmpty())
      {
        debugInfo(tn, "draftdb.count="+draftdb.count());
        sleep(200);
      }
      debugInfo(tn, "draftdb.count=" + draftdb.count());
      sleep(200);
    }
    debugInfo(tn, "Ending test with success");
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -36,7 +36,7 @@
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogDBIterator;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor;
import org.opends.server.util.StaticUtils;
@@ -247,14 +247,14 @@
      assertEquals(handler.getPreviousCookie(sn2),value2);
      assertEquals(handler.getPreviousCookie(sn3),value3);
      ChangelogDBIterator it = handler.generateIterator(sn1);
      assertIteratorReadsInOrder(it, sn1, sn2, sn3);
      ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(sn1);
      assertCursorReadsInOrder(cursor, sn1, sn2, sn3);
      it = handler.generateIterator(sn2);
      assertIteratorReadsInOrder(it, sn2, sn3);
      cursor = handler.getCursorFrom(sn2);
      assertCursorReadsInOrder(cursor, sn2, sn3);
      it = handler.generateIterator(sn3);
      assertIteratorReadsInOrder(it, sn3);
      cursor = handler.getCursorFrom(sn3);
      assertCursorReadsInOrder(cursor, sn3);
      handler.clear();
@@ -274,21 +274,21 @@
    }
  }
  private void assertIteratorReadsInOrder(ChangelogDBIterator it, int... sns)
      throws ChangelogException
  private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor,
      int... sns) throws ChangelogException
  {
    try
    {
      for (int i = 0; i < sns.length; i++)
      {
        assertEquals(it.getDraftCN(), sns[i]);
        assertEquals(cursor.getDraftCN(), sns[i]);
        final boolean isNotLast = i + 1 < sns.length;
        assertEquals(it.next(), isNotLast);
        assertEquals(cursor.next(), isNotLast);
      }
    }
    finally
    {
      it.close();
      cursor.close();
    }
  }
}