mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
29.56.2013 836356c02790df187234bc6f42ed4ddd2dfb2d75
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -39,6 +39,9 @@
import com.sleepycat.je.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -51,7 +54,7 @@
public class ReplicationDbEnv
{
  private Environment dbEnvironment;
  private Database stateDb;
  private Database changelogStateDb;
  private ReplicationServer replicationServer;
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
@@ -100,19 +103,15 @@
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize",
            String.valueOf(2 * 1024 * 1024));
        envConfig.setConfigParam("je.log.iteratorReadSize",
            String.valueOf(2 * 1024 * 1024));
        envConfig.setConfigParam("je.log.faultReadSize",
            String.valueOf(4 * 1024));
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam("je.maxMemory",
            String.valueOf(16 * 1024 * 1024));
        envConfig.setConfigParam("je.maxMemory", mb(16));
      }
      else
      {
@@ -120,8 +119,7 @@
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam("je.maxMemory",
            String.valueOf(5 * 1024 * 1024));
        envConfig.setConfigParam("je.maxMemory", mb(5));
      }
      // Since records are always added at the end of the Replication log and
@@ -141,7 +139,7 @@
       * the topology. The database "changelogstate" is used to store the list
       * of all the servers that have been seen in the past.
       */
      stateDb = openDatabase("changelogstate");
      changelogStateDb = openDatabase("changelogstate");
    }
    catch (RuntimeException e)
    {
@@ -149,6 +147,16 @@
    }
  }
  private String kb(int sizeInKb)
  {
    return String.valueOf(sizeInKb * 1024);
  }
  private String mb(int sizeInMb)
  {
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws RuntimeException
  {
    final DatabaseConfig dbConfig = new DatabaseConfig();
@@ -163,34 +171,31 @@
   *
   * @throws ChangelogException in case of underlying Exception
   */
  public void start() throws ChangelogException
  public void initializeFromChangelogStateDB() throws ChangelogException
  {
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    Cursor cursor = stateDb.openCursor(null, null);
    Cursor cursor = changelogStateDb.openCursor(null, null);
    try
    {
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        String stringData = toString(data.getData());
        final String stringData = toString(data.getData());
        if (debugEnabled())
          TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
              + " Read (tag/baseDn/generationId) OR (serverId/baseDN): "
              + stringData);
          debug("read (" + GENERATION_ID_TAG + " generationId baseDn) OR "
              + "(serverId baseDN): " + stringData);
        String[] str = stringData.split(FIELD_SEPARATOR, 3);
        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
        if (str[0].equals(GENERATION_ID_TAG))
        {
          long generationId = toLong(str[1]);
          String baseDn = str[2];
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                + " Has read baseDn=" + baseDn + " generationId="
                + generationId);
            debug("has read baseDn=" + baseDn + " generationId=" +generationId);
          replicationServer.initDomainGenerationID(baseDn, generationId);
        }
@@ -200,8 +205,7 @@
          String baseDn = str[1];
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                + " Has read: baseDn=" + baseDn + " serverId=" + serverId);
            debug("has read: baseDn=" + baseDn + " serverId=" + serverId);
          replicationServer.addServerIdToDomain(serverId, baseDn);
        }
@@ -264,73 +268,91 @@
    }
  }
    /**
     * Finds or creates the database used to store changes from the server
     * with the given serverId and the given baseDn.
     *
     * @param serverId     The server id that identifies the server.
     * @param baseDn       The baseDn that identifies the domain.
     * @param generationId The generationId associated to this domain.
     * @return the Database.
     * @throws ChangelogException in case of underlying Exception.
     */
    public Database getOrAddDb(int serverId, String baseDn, long generationId)
        throws ChangelogException
    {
      if (debugEnabled())
        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
          serverId + " " + baseDn + " " + generationId);
      try
      {
        final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn;
        // Opens the database for the changes received from this server
        // on this domain. Create it if it does not already exist.
        Database db = openDatabase(serverIdKey);
        // Creates the record serverId/domain base Dn in the stateDb
        // if it does not already exist.
        putInStateDBIfNotExist(serverIdKey, serverIdKey);
        // Creates the record domain base Dn/ generationId in the stateDb
        // if it does not already exist.
        final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        final String genIdData = GENERATION_ID_TAG
            + FIELD_SEPARATOR + generationId
            + FIELD_SEPARATOR + baseDn;
        putInStateDBIfNotExist(genIdKey, genIdData);
        return db;
      }
      catch (RuntimeException e)
      {
        throw new ChangelogException(e);
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
        return null;
      }
    }
  private void putInStateDBIfNotExist(String keyString, String dataString)
      throws UnsupportedEncodingException, RuntimeException
  private byte[] toBytes(String s)
  {
    byte[] byteId = keyString.getBytes("UTF-8");
    byte[] dataByteId = dataString.getBytes("UTF-8");
    DatabaseEntry key = new DatabaseEntry();
    key.setData(byteId);
    try
    {
      return s.getBytes("UTF-8");
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
      return null;
    }
  }
  /**
   * Finds or creates the database used to store changes from the server with
   * the given serverId and the given baseDn.
   *
   * @param serverId
   *          The server id that identifies the server.
   * @param baseDn
   *          The baseDn that identifies the domain.
   * @param generationId
   *          The generationId associated to this domain.
   * @return the Database.
   * @throws ChangelogException
   *           in case of underlying Exception.
   */
  public Database getOrAddDb(int serverId, String baseDn, long generationId)
      throws ChangelogException
  {
    if (debugEnabled())
      debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDn + ", "
          + generationId + ")");
    try
    {
      // JNR: redundant info is stored between the key and data down below.
      // It is probably ok since "changelogstate" DB does not receive a high
      // volume of inserts.
      final String serverIdToBaseDn = buildServerIdKey(baseDn, serverId);
      // Opens the DB for the changes received from this server on this domain.
      Database db = openDatabase(serverIdToBaseDn);
      putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
      putInChangelogStateDBIfNotExist(buildGenIdKey(baseDn),
                                      buildGenIdData(baseDn, generationId));
      return db;
    }
    catch (RuntimeException e)
    {
      throw new ChangelogException(e);
    }
  }
  private String buildGenIdKey(String baseDn)
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
  }
  private String buildServerIdKey(String baseDn, int serverId)
  {
    return serverId + FIELD_SEPARATOR + baseDn;
  }
  private String buildGenIdData(String baseDn, long generationId)
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
        + baseDn;
  }
  private void putInChangelogStateDBIfNotExist(String keyString,
      String dataString) throws RuntimeException
  {
    DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
    if (status == OperationStatus.NOTFOUND)
    if (changelogStateDb.get(null, key, data, DEFAULT) == NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(dataByteId);
        data.setData(toBytes(dataString));
        if (debugEnabled())
          TRACER.debugInfo("Created in the state Db record key=[" + keyString
          debug("putting record in the changelogstate Db key=[" + keyString
              + "] value=[" + dataString + "]");
        stateDb.put(txn, key, data);
        changelogStateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
      catch (DatabaseException dbe)
@@ -367,7 +389,7 @@
    {
      try
      {
        stateDb.close();
        changelogStateDb.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
@@ -397,18 +419,8 @@
   */
  public void clearGenerationId(String baseDn)
  {
    String methodInvocation = "clearGenerationId(baseDN=" + baseDn + ")";
    String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
    OperationStatus status = deleteFromStateDB(key, methodInvocation);
    if (status == OperationStatus.SUCCESS || status == OperationStatus.KEYEXIST)
    {
      // TODO : should have a better error logging
      if (debugEnabled())
        TRACER.debugInfo("In "
            + this.replicationServer.getMonitorInstanceName()
            + methodInvocation + " failed " + status);
    }
    deleteFromChangelogStateDB(buildGenIdKey(baseDn),
        "clearGenerationId(baseDN=" + baseDn + ")");
  }
  /**
@@ -422,36 +434,30 @@
   */
  public void clearServerId(String baseDn, int serverId)
  {
    String key = serverId + FIELD_SEPARATOR + baseDn;
    deleteFromStateDB(key, "clearServerId(baseDN=" + baseDn + " , serverId="
        + serverId + ")");
    deleteFromChangelogStateDB(buildServerIdKey(baseDn, serverId),
        "clearServerId(baseDN=" + baseDn + " , serverId=" + serverId + ")");
  }
  private OperationStatus deleteFromStateDB(String keyString,
  private void deleteFromChangelogStateDB(String keyString,
      String methodInvocation)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " " + methodInvocation);
      debug(methodInvocation + " starting");
    try
    {
      final byte[] byteId = keyString.getBytes("UTF-8");
      final DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
      final DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status != OperationStatus.NOTFOUND)
      OperationStatus status = changelogStateDb.get(null, key, data, DEFAULT);
      if (status == SUCCESS)
      {
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try
        {
          stateDb.delete(txn, key);
          changelogStateDb.delete(txn, key);
          txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          if (debugEnabled())
            TRACER.debugInfo(" In "
                + this.replicationServer.getMonitorInstanceName() + " "
                + methodInvocation + " succeeded " + status);
            debug(methodInvocation + " succeeded");
        }
        catch (RuntimeException dbe)
        {
@@ -460,18 +466,18 @@
          throw dbe;
        }
      }
      return status;
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
      else
      {
        if (debugEnabled())
          debug(methodInvocation + " failed: key=[ " + keyString
              + "] not found");
      }
    }
    catch (RuntimeException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
    }
    return null;
  }
    /**
@@ -569,4 +575,10 @@
    replicationServer.shutdown();
  }
  private void debug(String message)
  {
    TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ", "
        + message);
  }
}