mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.45.2013 9d5b1c7a628471604be4768f97fcdaf13cf0639f
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -41,7 +44,9 @@
import org.opends.server.types.DirectoryException;
import com.sleepycat.je.*;
import com.sleepycat.je.config.EnvironmentParams;
import static com.sleepycat.je.EnvironmentConfig.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -51,19 +56,19 @@
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used to represent a Db environment that can be used
 * to create ReplicationDB.
 * This class represents a DB environment that acts as a factory for
 * ReplicationDBs.
 */
public class ReplicationDbEnv
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
  /**
   * The tracer object for the debug logger.
   */
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
@@ -92,9 +97,9 @@
       */
      envConfig.setAllowCreate(true);
      envConfig.setTransactional(true);
      envConfig.setConfigParam("je.cleaner.threads", "2");
      envConfig.setConfigParam("je.checkpointer.highPriority", "true");
      envConfig.setConfigParam(STATS_COLLECT, "false");
      envConfig.setConfigParam(CLEANER_THREADS, "2");
      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
      /*
       * Tests have shown that since the parsing of the Replication log is
       * always done sequentially, it is not necessary to use a large DB cache.
@@ -106,15 +111,15 @@
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam("je.maxMemory", mb(16));
        envConfig.setConfigParam(MAX_MEMORY, mb(16));
      }
      else
      {
@@ -122,7 +127,7 @@
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam("je.maxMemory", mb(5));
        envConfig.setConfigParam(MAX_MEMORY, mb(5));
      }
      // Since records are always added at the end of the Replication log and
@@ -160,12 +165,27 @@
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws RuntimeException
  private Database openDatabase(String databaseName) throws ChangelogException,
      RuntimeException
  {
    if (isShuttingDown.get())
    {
      // TODO JNR i18n
      throw new ChangelogException(Message.raw("DB is closing"));
    }
    final DatabaseConfig dbConfig = new DatabaseConfig();
    dbConfig.setAllowCreate(true);
    dbConfig.setTransactional(true);
    return dbEnvironment.openDatabase(null, databaseName, dbConfig);
    final Database db =
        dbEnvironment.openDatabase(null, databaseName, dbConfig);
    if (isShuttingDown.get())
    {
      closeDB(db);
      // TODO JNR i18n
      throw new ChangelogException(Message.raw("DB is closing"));
    }
    allDbs.add(db);
    return db;
  }
  /**
@@ -395,44 +415,69 @@
      }
    }
    /**
     * Shutdown the Db environment.
     */
    public void shutdown()
  /**
   * Shutdown the Db environment.
   */
  public void shutdown()
  {
    isShuttingDown.set(true);
    // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
    // This code rely on openDatabase() to close databases opened concurrently
    // with this code
    final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
    allDbs.clear();
    for (Database db : allDbsCopy)
    {
      try
      {
        changelogStateDb.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      try
      {
        dbEnvironment.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      closeDB(db);
    }
    try
    {
      dbEnvironment.close();
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(e));
    }
  }
  private void closeDB(Database db)
  {
    allDbs.remove(db);
    try
    {
      db.close();
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(e));
    }
  }
  private Message newErrorMessage(DatabaseException e)
  {
    if (!isShuttingDown.get())
    {
      return NOTE_EXCEPTION_CLOSING_DATABASE
          .get(stackTraceToSingleLineString(e));
    }
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
    mb.append(" ");
    mb.append(stackTraceToSingleLineString(e));
    return mb.toMessage();
  }
  /**
   * Clears the provided generationId associated to the provided baseDN from the
   * state Db.
   *
   * @param baseDN
   *          The baseDN for which the generationID must be cleared.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clearGenerationId(DN baseDN)
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildGenIdKey(baseDN),
        "clearGenerationId(baseDN=" + baseDN + ")");
@@ -446,15 +491,17 @@
   *          The baseDN for which the serverId must be cleared.
   * @param serverId
   *          The serverId to remove from the Db.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clearServerId(DN baseDN, int serverId)
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(String keyString,
      String methodInvocation)
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
      debug(methodInvocation + " starting");
@@ -490,18 +537,23 @@
    }
    catch (RuntimeException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
      throw new ChangelogException(dbe);
    }
  }
    /**
     * Clears the database.
     *
     * @param databaseName The name of the database to clear.
     * @param db
     *          The database to clear.
     */
    public final void clearDb(String databaseName)
    public final void clearDb(Database db)
    {
      String databaseName = db.getDatabaseName();
      // Closing is requested by Berkeley JE before truncate
      db.close();
      Transaction txn = null;
      try
      {