mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.22.2013 45a8024fe68e7bc451a5a22afcaf31e7edb745a1
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
@@ -39,10 +34,15 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import com.sleepycat.je.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used to represent a Db environment that can be used
 * to create ReplicationDB.
@@ -67,95 +67,99 @@
   * @param path Path where the backing files must be created.
   * @param replicationServer the ReplicationServer that creates this
   *                          ReplicationDbEnv.
   * @throws DatabaseException If a DatabaseException occurred that prevented
   * @throws ChangelogException If an Exception occurred that prevented
   *                           the initialization to happen.
   * @throws ReplicationDBException If a replicationServer internal error caused
   *                              a failure of the replicationServer processing.
   */
  public ReplicationDbEnv(String path, ReplicationServer replicationServer)
         throws DatabaseException, ReplicationDBException
      throws ChangelogException
  {
    this.replicationServer = replicationServer;
    EnvironmentConfig envConfig = new EnvironmentConfig();
    /* Create the DB Environment that will be used for all
     * the ReplicationServer activities related to the db
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam("je.cleaner.threads", "2");
    envConfig.setConfigParam("je.checkpointer.highPriority", "true");
    /*
     * Tests have shown that since the parsing of the Replication log is always
     * done sequentially, it is not necessary to use a large DB cache.
     */
    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
    try
    {
      /*
       * If the JVM is reasonably large then we can safely default to bigger
       * read buffers. This will result in more scalable checkpointer and
       * cleaner performance.
       */
      envConfig.setConfigParam("je.cleaner.lookAheadCacheSize",
          String.valueOf(2 * 1024 * 1024));
      envConfig.setConfigParam("je.log.iteratorReadSize",
          String.valueOf(2 * 1024 * 1024));
      envConfig
          .setConfigParam("je.log.faultReadSize", String.valueOf(4 * 1024));
      EnvironmentConfig envConfig = new EnvironmentConfig();
      /*
       * The cache size must be bigger in order to accommodate the larger
       * buffers - see OPENDJ-943.
       * Create the DB Environment that will be used for all the
       * ReplicationServer activities related to the db
       */
      envConfig
          .setConfigParam("je.maxMemory", String.valueOf(16 * 1024 * 1024));
      envConfig.setAllowCreate(true);
      envConfig.setTransactional(true);
      envConfig.setConfigParam("je.cleaner.threads", "2");
      envConfig.setConfigParam("je.checkpointer.highPriority", "true");
      /*
       * Tests have shown that since the parsing of the Replication log is
       * always done sequentially, it is not necessary to use a large DB cache.
       */
      if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
      {
        /*
         * If the JVM is reasonably large then we can safely default to bigger
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize",
            String.valueOf(2 * 1024 * 1024));
        envConfig.setConfigParam("je.log.iteratorReadSize",
            String.valueOf(2 * 1024 * 1024));
        envConfig.setConfigParam("je.log.faultReadSize",
            String.valueOf(4 * 1024));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam("je.maxMemory",
            String.valueOf(16 * 1024 * 1024));
      }
      else
      {
        /*
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam("je.maxMemory",
            String.valueOf(5 * 1024 * 1024));
      }
      // Since records are always added at the end of the Replication log and
      // deleted at the beginning of the Replication log, this should never
      // cause any deadlock.
      envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
      envConfig.setLockTimeout(0, TimeUnit.SECONDS);
      // Since replication provides durability, we can reduce the DB durability
      // level so that we are immune to application / JVM crashes.
      envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
      dbEnvironment = new Environment(new File(path), envConfig);
      /*
       * One database is created to store the update from each LDAP server in
       * the topology. The database "changelogstate" is used to store the list
       * of all the servers that have been seen in the past.
       */
      DatabaseConfig dbConfig = new DatabaseConfig();
      dbConfig.setAllowCreate(true);
      dbConfig.setTransactional(true);
      stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
      start();
    }
    else
    catch (RuntimeException e)
    {
      /*
       * Use 5M so that the replication can be used with 64M total for the JVM.
       */
      envConfig.setConfigParam("je.maxMemory", String.valueOf(5 * 1024 * 1024));
      throw new ChangelogException(e);
    }
    // Since records are always added at the end of the Replication log and
    // deleted at the beginning of the Replication log, this should never
    // cause any deadlock.
    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
    // Since replication provides durability, we can reduce the DB durability
    // level so that we are immune to application / JVM crashes.
    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
    dbEnvironment = new Environment(new File(path), envConfig);
    /*
     * One database is created to store the update from each LDAP
     * server in the topology.
     * The database "changelogstate" is used to store the list of all
     * the servers that have been seen in the past.
     */
    DatabaseConfig dbConfig = new DatabaseConfig();
    dbConfig.setAllowCreate(true);
    dbConfig.setTransactional(true);
    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
    start();
  }
  /**
   * Read the list of known servers from the database and start dbHandler
   * for each of them.
   *
   * @throws DatabaseException in case of underlying DatabaseException
   * @throws ReplicationDBException when the information from the database
   *                              cannot be decoded correctly.
   * @throws ChangelogException in case of underlying Exception
   */
  private void start() throws DatabaseException, ReplicationDBException
  private void start() throws ChangelogException, DatabaseException
  {
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
@@ -168,23 +172,15 @@
    }
    finally
    {
      try
      {
        cursor.close();
      }
      catch (Exception ignored)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
      }
      close(cursor);
    }
  }
  private void readDomainBaseDNGenerationIDRecords(DatabaseEntry key,
      DatabaseEntry data, Cursor cursor) throws ReplicationDBException
      DatabaseEntry data, Cursor cursor) throws ChangelogException,
      DatabaseException
  {
    /*
     * Get the domain base DN/ generationIDs records
     */
    // Get the domain base DN/ generationIDs records
    OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    {
@@ -214,11 +210,10 @@
  }
  private void readServerIdDomainBaseDNRecords(DatabaseEntry key,
      DatabaseEntry data, Cursor cursor) throws ReplicationDBException
      DatabaseEntry data, Cursor cursor) throws ChangelogException,
      DatabaseException
  {
    /*
     * Get the server Id / domain base DN records
     */
    // Get the server Id / domain base DN records
    OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    {
@@ -252,7 +247,7 @@
    }
  }
  private int toInt(String data) throws ReplicationDBException
  private int toInt(String data) throws ChangelogException
  {
    try
    {
@@ -261,13 +256,13 @@
    {
      // should never happen
      // TODO: i18n
      throw new ReplicationDBException(Message
          .raw("replicationServer state database has a wrong format: "
              + e.getLocalizedMessage() + "<" + data + ">"));
      throw new ChangelogException(Message.raw(
          "replicationServer state database has a wrong format: "
          + e.getLocalizedMessage() + "<" + data + ">"));
    }
  }
  private long toLong(String data) throws ReplicationDBException
  private long toLong(String data) throws ChangelogException
  {
    try
    {
@@ -277,13 +272,13 @@
    {
      // should never happen
      // TODO: i18n
      throw new ReplicationDBException(Message
          .raw("replicationServer state database has a wrong format: "
              + e.getLocalizedMessage() + "<" + data + ">"));
      throw new ChangelogException(Message.raw(
          "replicationServer state database has a wrong format: "
          + e.getLocalizedMessage() + "<" + data + ">"));
    }
  }
  private String toString(byte[] data) throws ReplicationDBException
  private String toString(byte[] data) throws ChangelogException
  {
    try
    {
@@ -293,7 +288,7 @@
    {
      // should never happens
      // TODO: i18n
      throw new ReplicationDBException(Message.raw("need UTF-8 support"));
      throw new ChangelogException(Message.raw("need UTF-8 support"));
    }
  }
@@ -305,10 +300,10 @@
     * @param baseDn       The baseDn that identifies the domain.
     * @param generationId The generationId associated to this domain.
     * @return the Database.
     * @throws DatabaseException in case of underlying Exception.
     * @throws ChangelogException in case of underlying Exception.
     */
    public Database getOrAddDb(int serverId, String baseDn, long generationId)
    throws DatabaseException
        throws ChangelogException
    {
      if (debugEnabled())
        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
@@ -337,6 +332,10 @@
        putInStateDBIfNotExist(genIdKey, genIdData);
        return db;
      }
      catch (RuntimeException e)
      {
        throw new ChangelogException(e);
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
@@ -345,7 +344,7 @@
    }
  private void putInStateDBIfNotExist(String keyString, String dataString)
      throws UnsupportedEncodingException
      throws UnsupportedEncodingException, RuntimeException
  {
    byte[] byteId = keyString.getBytes("UTF-8");
    byte[] dataByteId = dataString.getBytes("UTF-8");
@@ -378,11 +377,18 @@
     * Creates a new transaction.
     *
     * @return the transaction.
     * @throws DatabaseException in case of underlying database Exception.
     * @throws ChangelogException in case of underlying exception
     */
    public Transaction beginTransaction() throws DatabaseException
    public Transaction beginTransaction() throws ChangelogException
    {
      return dbEnvironment.beginTransaction(null, null);
      try
      {
        return dbEnvironment.beginTransaction(null, null);
      }
      catch (RuntimeException e)
      {
        throw new ChangelogException(e);
      }
    }
    /**
@@ -478,7 +484,7 @@
                + this.replicationServer.getMonitorInstanceName() + " "
                + methodInvocation + " succeeded " + status);
        }
        catch (DatabaseException dbe)
        catch (RuntimeException dbe)
        {
          // Abort the txn and propagate the Exception to the caller
          txn.abort();
@@ -491,7 +497,7 @@
    {
      // can't happen
    }
    catch (DatabaseException dbe)
    catch (RuntimeException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
@@ -514,7 +520,7 @@
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
        txn = null;
      }
      catch (DatabaseException e)
      catch (RuntimeException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
@@ -540,10 +546,9 @@
     * TODO:ECL how to manage compatibility of this db with  new domains
     * added or removed ?
     * @return the retrieved or created db.
     * @throws DatabaseException when a problem occurs.
     * @throws ChangelogException when a problem occurs.
     */
    public Database getOrCreateDraftCNDb()
    throws DatabaseException
    public Database getOrCreateDraftCNDb() throws ChangelogException
    {
      String stringId = "draftcndb";
@@ -553,6 +558,13 @@
      dbConfig.setAllowCreate(true);
      dbConfig.setTransactional(true);
      return dbEnvironment.openDatabase(null, stringId, dbConfig);
      try
      {
        return dbEnvironment.openDatabase(null, stringId, dbConfig);
      }
      catch (RuntimeException e)
      {
        throw new ChangelogException(e);
      }
    }
}