mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.03.2013 a3782e7e00bbdeae37df8b8388fb6fecc17ede27
ReplicationDbEnv.java:
Extracted methods readDomainBaseDNGenerationIDRecords(), readServerIdDomainBaseDNRecords(), toInt(), toLong() and toString() from start().
Extracted method putInStateDBIfNotExist() from getOrAddDb().
Extracted method deleteFromStateDB() from clearGenerationId() and clearServerId().
1 files modified
528 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 528 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -26,24 +26,23 @@
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.DebugLogLevel;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.*;
import java.util.concurrent.TimeUnit;
/**
 * This class is used to represent a Db environment that can be used
 * to create ReplicationDB.
@@ -164,122 +163,8 @@
    try
    {
      /*
       *  Get the domain base DN/ generationIDs records
       */
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        try
        {
          String stringData = new String(data.getData(), "UTF-8");
          if (debugEnabled())
            TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " Read tag baseDn generationId=" + stringData);
          String[] str = stringData.split(FIELD_SEPARATOR, 3);
          if (str[0].equals(GENERATION_ID_TAG))
          {
            long generationId;
            try
            {
              // <generationId>
              generationId = new Long(str[1]);
            }
            catch (NumberFormatException e)
            {
              // should never happen
              // TODO: i18n
              throw new ReplicationDBException(Message.raw(
                  "replicationServer state database has a wrong format: " +
                  e.getLocalizedMessage()
                  + "<" + str[1] + ">"));
            }
            String baseDn = str[2];
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " Has read baseDn=" + baseDn
                + " generationId=" + generationId);
            replicationServer.getReplicationServerDomain(baseDn, true).
            initGenerationID(generationId);
          }
        }
        catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw("need UTF-8 support"));
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      /*
       * Get the server Id / domain base DN records
       */
      status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        String stringData;
        try
        {
          stringData = new String(data.getData(), "UTF-8");
        }
        catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
          "need UTF-8 support"));
        }
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " Read serverId BaseDN=" + stringData);
        String[] str = stringData.split(FIELD_SEPARATOR, 2);
        if (!str[0].equals(GENERATION_ID_TAG))
        {
          int serverId;
          try
          {
            // <serverId>
            serverId = new Integer(str[0]);
          } catch (NumberFormatException e)
          {
            // should never happen
            // TODO: i18n
            throw new ReplicationDBException(Message.raw(
                "replicationServer state database has a wrong format: " +
                e.getLocalizedMessage()
                + "<" + str[0] + ">"));
          }
          // <baseDn>
          String baseDn = str[1];
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " Has read: baseDn=" + baseDn
              + " serverId=" + serverId);
          DbHandler dbHandler =
            new DbHandler(
                serverId, baseDn, replicationServer, this,
                replicationServer.getQueueSize());
          replicationServer.getReplicationServerDomain(baseDn, true).
          setDbHandler(serverId, dbHandler);
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      readDomainBaseDNGenerationIDRecords(key, data, cursor);
      readServerIdDomainBaseDNRecords(key, data, cursor);
    }
    finally
    {
@@ -294,6 +179,124 @@
    }
  }
  private void readDomainBaseDNGenerationIDRecords(DatabaseEntry key,
      DatabaseEntry data, Cursor cursor) throws ReplicationDBException
  {
    /*
     * Get the domain base DN/ generationIDs records
     */
    OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    {
      String stringData = toString(data.getData());
      if (debugEnabled())
        TRACER.debugInfo("In "
            + this.replicationServer.getMonitorInstanceName()
            + " Read tag baseDn generationId=" + stringData);
      String[] str = stringData.split(FIELD_SEPARATOR, 3);
      if (str[0].equals(GENERATION_ID_TAG))
      {
        long generationId = toLong(str[1]);
        String baseDn = str[2];
        if (debugEnabled())
          TRACER.debugInfo("In "
              + this.replicationServer.getMonitorInstanceName()
              + " Has read baseDn=" + baseDn + " generationId=" + generationId);
        replicationServer.getReplicationServerDomain(baseDn, true)
            .initGenerationID(generationId);
      }
      status = cursor.getNext(key, data, LockMode.DEFAULT);
    }
  }
  private void readServerIdDomainBaseDNRecords(DatabaseEntry key,
      DatabaseEntry data, Cursor cursor) throws ReplicationDBException
  {
    /*
     * Get the server Id / domain base DN records
     */
    OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    {
      String stringData = toString(data.getData());
      if (debugEnabled())
        TRACER.debugInfo("In "
            + this.replicationServer.getMonitorInstanceName()
            + " Read serverId BaseDN=" + stringData);
      String[] str = stringData.split(FIELD_SEPARATOR, 2);
      if (!str[0].equals(GENERATION_ID_TAG))
      {
        int serverId = toInt(str[0]);
        String baseDn = str[1];
        if (debugEnabled())
          TRACER.debugInfo("In "
              + this.replicationServer.getMonitorInstanceName()
              + " Has read: baseDn=" + baseDn + " serverId=" + serverId);
        DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this,
                replicationServer.getQueueSize());
        replicationServer.getReplicationServerDomain(baseDn, true)
            .setDbHandler(serverId, dbHandler);
      }
      status = cursor.getNext(key, data, LockMode.DEFAULT);
    }
  }
  private int toInt(String data) throws ReplicationDBException
  {
    try
    {
      return Integer.parseInt(data);
    } catch (NumberFormatException e)
    {
      // should never happen
      // TODO: i18n
      throw new ReplicationDBException(Message
          .raw("replicationServer state database has a wrong format: "
              + e.getLocalizedMessage() + "<" + data + ">"));
    }
  }
  private long toLong(String data) throws ReplicationDBException
  {
    try
    {
      return Long.parseLong(data);
    }
    catch (NumberFormatException e)
    {
      // should never happen
      // TODO: i18n
      throw new ReplicationDBException(Message
          .raw("replicationServer state database has a wrong format: "
              + e.getLocalizedMessage() + "<" + data + ">"));
    }
  }
  private String toString(byte[] data) throws ReplicationDBException
  {
    try
    {
      return new String(data, "UTF-8");
    }
    catch (UnsupportedEncodingException e)
    {
      // should never happens
      // TODO: i18n
      throw new ReplicationDBException(Message.raw("need UTF-8 support"));
    }
  }
    /**
     * Finds or creates the database used to store changes from the server
     * with the given serverId and the given baseDn.
@@ -304,7 +307,7 @@
     * @return the Database.
     * @throws DatabaseException in case of underlying Exception.
     */
    public Database getOrAddDb(int serverId, String baseDn, Long generationId)
    public Database getOrAddDb(int serverId, String baseDn, long generationId)
    throws DatabaseException
    {
      if (debugEnabled())
@@ -312,71 +315,25 @@
          serverId + " " + baseDn + " " + generationId);
      try
      {
        String stringId = serverId + FIELD_SEPARATOR + baseDn;
        String key = serverId + FIELD_SEPARATOR + baseDn;
        // Opens the database for the changes received from this server
        // on this domain. Create it if it does not already exist.
        DatabaseConfig dbConfig = new DatabaseConfig();
        dbConfig.setAllowCreate(true);
        dbConfig.setTransactional(true);
        Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
        Database db = dbEnvironment.openDatabase(null, key, dbConfig);
        // Creates the record serverId/domain base Dn in the stateDb
        // if it does not already exist.
        byte[] byteId;
        byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(byteId);
            if (debugEnabled())
              TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
                " serverId/Domain=<"+stringId+">");
            stateDb.put(txn, key, data);
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        putInStateDBIfNotExist(key, key);
        // Creates the record domain base Dn/ generationId in the stateDb
        // if it does not already exist.
        stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
        generationId.toString() + FIELD_SEPARATOR + baseDn;
        byteId = stringId.getBytes("UTF-8");
        byte[] dataByteId;
        dataByteId = dataStringId.getBytes("UTF-8");
        key = new DatabaseEntry();
        key.setData(byteId);
        data = new DatabaseEntry();
        status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(dataByteId);
            if (debugEnabled())
              TRACER.debugInfo(
                  "Created in the state Db record Tag/Domain/GenId key=" +
                  stringId + " value=" + dataStringId);
            stateDb.put(txn, key, data);
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
                + FIELD_SEPARATOR + baseDn;
        putInStateDBIfNotExist(key, data);
        return db;
      }
      catch (UnsupportedEncodingException e)
@@ -386,6 +343,36 @@
      }
    }
  private void putInStateDBIfNotExist(String keyString, String dataString)
      throws UnsupportedEncodingException
  {
    byte[] byteId = keyString.getBytes("UTF-8");
    byte[] dataByteId = dataString.getBytes("UTF-8");
    DatabaseEntry key = new DatabaseEntry();
    key.setData(byteId);
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
    if (status == OperationStatus.NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(dataByteId);
        if (debugEnabled())
          TRACER.debugInfo("Created in the state Db record key=[" + keyString
              + "] value=[" + dataString + "]");
        stateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
      catch (DatabaseException dbe)
      {
        // Abort the txn and propagate the Exception to the caller
        txn.abort();
        throw dbe;
      }
    }
  }
    /**
     * Creates a new transaction.
     *
@@ -425,122 +412,91 @@
      }
    }
    /**
     * Clears the provided generationId associated to the provided baseDn
     * from the state Db.
     *
     * @param baseDn The baseDn for which the generationID must be cleared.
     *
     */
    public void clearGenerationId(String baseDn)
  /**
   * Clears the provided generationId associated to the provided baseDn from the
   * state Db.
   *
   * @param baseDn
   *          The baseDn for which the generationID must be cleared.
   */
  public void clearGenerationId(String baseDn)
  {
    String methodInvocation = "clearGenerationId(baseDN=" + baseDn + ")";
    String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
    OperationStatus status = deleteFromStateDB(key, methodInvocation);
    if (status == OperationStatus.SUCCESS || status == OperationStatus.KEYEXIST)
    {
      // TODO : should have a better error logging
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
          " clearGenerationId " + baseDn);
      try
        TRACER.debugInfo("In "
            + this.replicationServer.getMonitorInstanceName()
            + methodInvocation + " failed " + status);
    }
  }
  /**
   * Clears the provided serverId associated to the provided baseDn from the
   * state Db.
   *
   * @param baseDn
   *          The baseDn for which the generationID must be cleared.
   * @param serverId
   *          The serverId to remove from the Db.
   */
  public void clearServerId(String baseDn, int serverId)
  {
    String key = serverId + FIELD_SEPARATOR + baseDn;
    deleteFromStateDB(key, "clearServerId(baseDN=" + baseDn + " , serverId="
        + serverId + ")");
  }
  private OperationStatus deleteFromStateDB(String keyString,
      String methodInvocation)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + " " + methodInvocation);
    try
    {
      final byte[] byteId = keyString.getBytes("UTF-8");
      final DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      final DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status != OperationStatus.NOTFOUND)
      {
        // Deletes the record domain base Dn/ generationId in the stateDb
        String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        byte[] byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if ((status == OperationStatus.SUCCESS) ||
            (status == OperationStatus.KEYEXIST))
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try
          {
            stateDb.delete(txn, key);
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " clearGenerationId (" + baseDn +") succeeded.");
          }
          catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        else
        {
          // TODO : should have a better error logging
          stateDb.delete(txn, key);
          txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " clearGenerationId ("+ baseDn + " failed" + status.toString());
            TRACER.debugInfo(" In "
                + this.replicationServer.getMonitorInstanceName() + " "
                + methodInvocation + " succeeded " + status);
        }
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
      }
      catch (DatabaseException dbe)
      {
        // can't happen
      }
    }
    /**
     * Clears the provided serverId associated to the provided baseDn
     * from the state Db.
     *
     * @param baseDn The baseDn for which the generationID must be cleared.
     * @param serverId The serverId to remove from the Db.
     *
     */
    public void clearServerId(String baseDn, int serverId)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
      try
      {
        String stringId = serverId + FIELD_SEPARATOR + baseDn;
        // Deletes the record serverId/domain base Dn in the stateDb
        byte[] byteId;
        byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status != OperationStatus.NOTFOUND)
        catch (DatabaseException dbe)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(byteId);
            stateDb.delete(txn, key);
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
            if (debugEnabled())
              TRACER.debugInfo(
                  " In " + this.replicationServer.getMonitorInstanceName() +
                  " clearServerId() succeeded " + baseDn + " " +
                  serverId);
          }
          catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
          // Abort the txn and propagate the Exception to the caller
          txn.abort();
          throw dbe;
        }
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
      }
      catch (DatabaseException dbe)
      {
        // can't happen
      }
      return status;
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
    }
    catch (DatabaseException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
    }
    return null;
  }
    /**
     * Clears the database.