mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.30.2013 cc21522ea71015c75a82ebf285644fa5ff57f46a
OPENDJ-1116 Introduce abstraction for the changelog DB


Moved the cnIndexDB + related code to the ChangelogDB implementation because under its responsibility.


ReplicationServer.java:
Moved cnIndexDB + related code to JEChangelogDB.
Moved clearGenerationId() code to JEChangelogDB.removeDomain().

ReplicationServerDomain.java:
The code ReplicationServer.clearGenerationId().


ChangelogDB.java:
Temporarily added clearCNIndexDB until I can sort out the can of worms created by folding this method into other methods.
Renamed newChangeNumberIndexDB() to getChangeNumberIndexDB().

JEChangelogDB.java
Moved cnIndexDB + related code here from ReplicationServer.
4 files modified
243 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 113 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 106 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -65,7 +65,6 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.types.ResultCode.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
@@ -143,17 +142,6 @@
  private long monitoringPublisherPeriod = 3000;
  /**
   * The handler of the changelog database, the database stores the relation
   * between a change number and the associated cookie.
   * <p>
   * Guarded by cnIndexDBLock
   */
  private ChangeNumberIndexDB cnIndexDB;
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
@@ -645,29 +633,6 @@
      DirectoryServer.deregisterWorkflowElement(eclwe);
      eclwe.finalizeWorkflowElement();
    }
    shutdownCNIndexDB();
  }
  private void shutdownCNIndexDB()
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.shutdown();
        }
        catch (ChangelogException ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
      }
    }
  }
  /**
@@ -802,34 +767,6 @@
  }
  /**
   * Clears the generationId for the replicationServerDomain related to the
   * provided baseDN.
   *
   * @param baseDN
   *          The baseDN for which to delete the generationId.
   */
  public void clearGenerationId(DN baseDN)
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.clear(baseDN);
        }
        catch (Exception ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
      }
    }
  }
  /**
   * Retrieves the time after which changes must be deleted from the
   * persistent storage (in milliseconds).
   *
@@ -1282,27 +1219,7 @@
      rsd.clearDbs();
    }
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.clear();
        }
        catch (Exception ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
        shutdownCNIndexDB();
        cnIndexDB = null;
      }
    }
    this.changelogDB.clearCNIndexDB();
  }
  /**
@@ -1508,29 +1425,10 @@
   * changelog.
   *
   * @return the handler.
   * @throws DirectoryException
   *           when needed.
   */
  ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
  ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    synchronized (cnIndexDBLock)
    {
      try
      {
        if (cnIndexDB == null)
        {
          cnIndexDB = this.changelogDB.newChangeNumberIndexDB();
        }
        return cnIndexDB;
      }
      catch (Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        Message message =
            ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage());
        throw new DirectoryException(OPERATIONS_ERROR, message, e);
      }
    }
    return this.changelogDB.getChangeNumberIndexDB();
  }
  /**
@@ -1567,14 +1465,13 @@
     *     replchangelog FROM that genState TO the crossDomainEligibleCSN
     *     (this diff is done domain by domain)
     */
    final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
    try
    {
      boolean dbEmpty = true;
      long firstChangeNumber = 0;
      long lastChangeNumber = 0;
      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
      final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord();
      final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
@@ -1657,7 +1554,7 @@
      {
        // The database was empty, just keep increasing numbers since last time
        // we generated one change number.
        long lastGeneratedCN = this.cnIndexDB.getLastGeneratedChangeNumber();
        long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
        firstChangeNumber += lastGeneratedCN;
        lastChangeNumber += lastGeneratedCN;
      }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2202,18 +2202,7 @@
   */
  public void clearDbs()
  {
    // Reset the localchange and state db for the current domain
    changelogDB.removeDomain(baseDN);
    try
    {
      localReplicationServer.clearGenerationId(baseDN);
    }
    catch (Exception e)
    {
      // TODO: i18n
      logError(Message.raw("Exception caught while clearing generationId:"
          + e.getLocalizedMessage()));
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -84,18 +84,21 @@
  void shutdownDB();
  /**
   * Temporary method added here until I (JNR) can find a way to get rid of it.
   */
  void clearCNIndexDB();
  /**
   * Removes the changelog database directory.
   */
  void removeDB();
  /**
   * Returns a new {@link ChangeNumberIndexDB} object.
   * Returns the {@link ChangeNumberIndexDB} object.
   *
   * @return a new {@link ChangeNumberIndexDB} object
   * @throws ChangelogException
   *           If a database problem happened
   * @return the {@link ChangeNumberIndexDB} object
   */
  ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException;
  ChangeNumberIndexDB getChangeNumberIndexDB();
  // Domain methods
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -70,6 +70,17 @@
  private final String dbDirectoryName;
  private final File dbDirectory;
  /**
   * The handler of the changelog database, the database stores the relation
   * between a change number and the associated cookie.
   * <p>
   * Guarded by cnIndexDBLock
   */
  private ChangeNumberIndexDB cnIndexDB;
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
  /** The local replication server. */
  private final ReplicationServer replicationServer;
@@ -229,10 +240,8 @@
    }
    catch (ChangelogException e)
    {
      Message message =
          ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e
              .getLocalizedMessage());
      logError(message);
      logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(),
          e.getLocalizedMessage()));
    }
  }
@@ -255,10 +264,33 @@
    }
  }
  private void shutdownCNIndexDB()
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.shutdown();
        }
        catch (ChangelogException ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public void shutdownDB()
  {
    shutdownCNIndexDB();
    if (dbEnv != null)
    {
      dbEnv.shutdown();
@@ -267,6 +299,33 @@
  /** {@inheritDoc} */
  @Override
  public void clearCNIndexDB()
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.clear();
        }
        catch (Exception ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
        shutdownCNIndexDB();
        cnIndexDB = null;
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public void removeDB()
  {
    StaticUtils.recursiveDelete(dbDirectory);
@@ -354,6 +413,7 @@
  @Override
  public void removeDomain(DN baseDN)
  {
    // 1- clear the replica DBs
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
    {
@@ -376,6 +436,26 @@
      shutdownDbHandlers(domainMap);
    }
    // 2- clear the ChangeNumber index DB
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.clear(baseDN);
        }
        catch (Exception ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
      }
    }
    // 3- clear the changelogstate DB
    try
    {
      dbEnv.clearGenerationId(baseDN);
@@ -446,9 +526,23 @@
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException
  public ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    return new DraftCNDbHandler(replicationServer, this.dbEnv);
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB == null)
      {
        try
        {
          cnIndexDB = new DraftCNDbHandler(replicationServer, this.dbEnv);
        }
        catch (Exception e)
        {
          logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
        }
      }
      return cnIndexDB;
    }
  }
  /** {@inheritDoc} */