mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
29.56.2013 836356c02790df187234bc6f42ed4ddd2dfb2d75
ReplicationDbEnv.java:
Renamed:
- field stateDb to changelogStateDb.
- start() to initializeFromChangelogStateDB().
- putInStateDBIfNotExist() to putInChangelogStateDBIfNotExist()
- deleteFromStateDB() to deleteFromChangelogStateDB() + changed the signature.
In getOrAddDb(), reduced code clutter and removed useless comments.
Extracted methods kb(), mb(), toBytes(), buildGenIdKey(), buildServerIdKey(), buildGenIdData() for better readability.
Extracted method debug() to reduce code clutter.
Used static imports.

ReplicationServer.java:
Consequence of the rename of ReplicationDbEnv.start().
2 files modified
248 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 246 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -526,7 +526,7 @@
      // Initialize the replicationServer database.
      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
          this);
      dbEnv.start();
      dbEnv.initializeFromChangelogStateDB();
      setServerURL();
      listenSocket = new ServerSocket();
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -39,6 +39,9 @@
import com.sleepycat.je.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -51,7 +54,7 @@
public class ReplicationDbEnv
{
  private Environment dbEnvironment;
  private Database stateDb;
  private Database changelogStateDb;
  private ReplicationServer replicationServer;
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
@@ -100,19 +103,15 @@
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize",
            String.valueOf(2 * 1024 * 1024));
        envConfig.setConfigParam("je.log.iteratorReadSize",
            String.valueOf(2 * 1024 * 1024));
        envConfig.setConfigParam("je.log.faultReadSize",
            String.valueOf(4 * 1024));
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam("je.maxMemory",
            String.valueOf(16 * 1024 * 1024));
        envConfig.setConfigParam("je.maxMemory", mb(16));
      }
      else
      {
@@ -120,8 +119,7 @@
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam("je.maxMemory",
            String.valueOf(5 * 1024 * 1024));
        envConfig.setConfigParam("je.maxMemory", mb(5));
      }
      // Since records are always added at the end of the Replication log and
@@ -141,7 +139,7 @@
       * the topology. The database "changelogstate" is used to store the list
       * of all the servers that have been seen in the past.
       */
      stateDb = openDatabase("changelogstate");
      changelogStateDb = openDatabase("changelogstate");
    }
    catch (RuntimeException e)
    {
@@ -149,6 +147,16 @@
    }
  }
  private String kb(int sizeInKb)
  {
    return String.valueOf(sizeInKb * 1024);
  }
  private String mb(int sizeInMb)
  {
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws RuntimeException
  {
    final DatabaseConfig dbConfig = new DatabaseConfig();
@@ -163,34 +171,31 @@
   *
   * @throws ChangelogException in case of underlying Exception
   */
  public void start() throws ChangelogException
  public void initializeFromChangelogStateDB() throws ChangelogException
  {
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    Cursor cursor = stateDb.openCursor(null, null);
    Cursor cursor = changelogStateDb.openCursor(null, null);
    try
    {
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        String stringData = toString(data.getData());
        final String stringData = toString(data.getData());
        if (debugEnabled())
          TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
              + " Read (tag/baseDn/generationId) OR (serverId/baseDN): "
              + stringData);
          debug("read (" + GENERATION_ID_TAG + " generationId baseDn) OR "
              + "(serverId baseDN): " + stringData);
        String[] str = stringData.split(FIELD_SEPARATOR, 3);
        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
        if (str[0].equals(GENERATION_ID_TAG))
        {
          long generationId = toLong(str[1]);
          String baseDn = str[2];
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                + " Has read baseDn=" + baseDn + " generationId="
                + generationId);
            debug("has read baseDn=" + baseDn + " generationId=" +generationId);
          replicationServer.initDomainGenerationID(baseDn, generationId);
        }
@@ -200,8 +205,7 @@
          String baseDn = str[1];
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
                + " Has read: baseDn=" + baseDn + " serverId=" + serverId);
            debug("has read: baseDn=" + baseDn + " serverId=" + serverId);
          replicationServer.addServerIdToDomain(serverId, baseDn);
        }
@@ -264,73 +268,91 @@
    }
  }
    /**
     * Finds or creates the database used to store changes from the server
     * with the given serverId and the given baseDn.
     *
     * @param serverId     The server id that identifies the server.
     * @param baseDn       The baseDn that identifies the domain.
     * @param generationId The generationId associated to this domain.
     * @return the Database.
     * @throws ChangelogException in case of underlying Exception.
     */
    public Database getOrAddDb(int serverId, String baseDn, long generationId)
        throws ChangelogException
    {
      if (debugEnabled())
        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
          serverId + " " + baseDn + " " + generationId);
      try
      {
        final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn;
        // Opens the database for the changes received from this server
        // on this domain. Create it if it does not already exist.
        Database db = openDatabase(serverIdKey);
        // Creates the record serverId/domain base Dn in the stateDb
        // if it does not already exist.
        putInStateDBIfNotExist(serverIdKey, serverIdKey);
        // Creates the record domain base Dn/ generationId in the stateDb
        // if it does not already exist.
        final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        final String genIdData = GENERATION_ID_TAG
            + FIELD_SEPARATOR + generationId
            + FIELD_SEPARATOR + baseDn;
        putInStateDBIfNotExist(genIdKey, genIdData);
        return db;
      }
      catch (RuntimeException e)
      {
        throw new ChangelogException(e);
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
        return null;
      }
    }
  private void putInStateDBIfNotExist(String keyString, String dataString)
      throws UnsupportedEncodingException, RuntimeException
  private byte[] toBytes(String s)
  {
    byte[] byteId = keyString.getBytes("UTF-8");
    byte[] dataByteId = dataString.getBytes("UTF-8");
    DatabaseEntry key = new DatabaseEntry();
    key.setData(byteId);
    try
    {
      return s.getBytes("UTF-8");
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
      return null;
    }
  }
  /**
   * Finds or creates the database used to store changes from the server with
   * the given serverId and the given baseDn.
   *
   * @param serverId
   *          The server id that identifies the server.
   * @param baseDn
   *          The baseDn that identifies the domain.
   * @param generationId
   *          The generationId associated to this domain.
   * @return the Database.
   * @throws ChangelogException
   *           in case of underlying Exception.
   */
  public Database getOrAddDb(int serverId, String baseDn, long generationId)
      throws ChangelogException
  {
    if (debugEnabled())
      debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDn + ", "
          + generationId + ")");
    try
    {
      // JNR: redundant info is stored between the key and data down below.
      // It is probably ok since "changelogstate" DB does not receive a high
      // volume of inserts.
      final String serverIdToBaseDn = buildServerIdKey(baseDn, serverId);
      // Opens the DB for the changes received from this server on this domain.
      Database db = openDatabase(serverIdToBaseDn);
      putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
      putInChangelogStateDBIfNotExist(buildGenIdKey(baseDn),
                                      buildGenIdData(baseDn, generationId));
      return db;
    }
    catch (RuntimeException e)
    {
      throw new ChangelogException(e);
    }
  }
  private String buildGenIdKey(String baseDn)
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
  }
  private String buildServerIdKey(String baseDn, int serverId)
  {
    return serverId + FIELD_SEPARATOR + baseDn;
  }
  private String buildGenIdData(String baseDn, long generationId)
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
        + baseDn;
  }
  private void putInChangelogStateDBIfNotExist(String keyString,
      String dataString) throws RuntimeException
  {
    DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
    if (status == OperationStatus.NOTFOUND)
    if (changelogStateDb.get(null, key, data, DEFAULT) == NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(dataByteId);
        data.setData(toBytes(dataString));
        if (debugEnabled())
          TRACER.debugInfo("Created in the state Db record key=[" + keyString
          debug("putting record in the changelogstate Db key=[" + keyString
              + "] value=[" + dataString + "]");
        stateDb.put(txn, key, data);
        changelogStateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
      catch (DatabaseException dbe)
@@ -367,7 +389,7 @@
    {
      try
      {
        stateDb.close();
        changelogStateDb.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
@@ -397,18 +419,8 @@
   */
  public void clearGenerationId(String baseDn)
  {
    String methodInvocation = "clearGenerationId(baseDN=" + baseDn + ")";
    String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
    OperationStatus status = deleteFromStateDB(key, methodInvocation);
    if (status == OperationStatus.SUCCESS || status == OperationStatus.KEYEXIST)
    {
      // TODO : should have a better error logging
      if (debugEnabled())
        TRACER.debugInfo("In "
            + this.replicationServer.getMonitorInstanceName()
            + methodInvocation + " failed " + status);
    }
    deleteFromChangelogStateDB(buildGenIdKey(baseDn),
        "clearGenerationId(baseDN=" + baseDn + ")");
  }
  /**
@@ -422,36 +434,30 @@
   */
  public void clearServerId(String baseDn, int serverId)
  {
    String key = serverId + FIELD_SEPARATOR + baseDn;
    deleteFromStateDB(key, "clearServerId(baseDN=" + baseDn + " , serverId="
        + serverId + ")");
    deleteFromChangelogStateDB(buildServerIdKey(baseDn, serverId),
        "clearServerId(baseDN=" + baseDn + " , serverId=" + serverId + ")");
  }
  private OperationStatus deleteFromStateDB(String keyString,
  private void deleteFromChangelogStateDB(String keyString,
      String methodInvocation)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " " + methodInvocation);
      debug(methodInvocation + " starting");
    try
    {
      final byte[] byteId = keyString.getBytes("UTF-8");
      final DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
      final DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status != OperationStatus.NOTFOUND)
      OperationStatus status = changelogStateDb.get(null, key, data, DEFAULT);
      if (status == SUCCESS)
      {
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try
        {
          stateDb.delete(txn, key);
          changelogStateDb.delete(txn, key);
          txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          if (debugEnabled())
            TRACER.debugInfo(" In "
                + this.replicationServer.getMonitorInstanceName() + " "
                + methodInvocation + " succeeded " + status);
            debug(methodInvocation + " succeeded");
        }
        catch (RuntimeException dbe)
        {
@@ -460,18 +466,18 @@
          throw dbe;
        }
      }
      return status;
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
      else
      {
        if (debugEnabled())
          debug(methodInvocation + " failed: key=[ " + keyString
              + "] not found");
      }
    }
    catch (RuntimeException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
    }
    return null;
  }
    /**
@@ -569,4 +575,10 @@
    replicationServer.shutdown();
  }
  private void debug(String message)
  {
    TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ", "
        + message);
  }
}