mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 13ac0c8454cbbdfa9643f88a1a10c1a03b1313d2
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -25,10 +25,12 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -58,6 +60,12 @@
  private Environment dbEnvironment = null;
  private Database stateDb = null;
  private ReplicationServer replicationServer = null;
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Initialize this class.
@@ -117,16 +125,120 @@
    Cursor cursor = stateDb.openCursor(null, null);
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    try
    {
      /*
       *  Get the domain base DN/ generationIDs records
       */
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        try
        {
          String stringData = new String(data.getData(), "UTF-8");
          String[] str = stringData.split(" ", 2);
          short serverId = new Short(str[0]);
          if (debugEnabled())
            TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " Read tag baseDn generationId=" + stringData);
          String[] str = stringData.split(FIELD_SEPARATOR, 3);
          if (str[0].equals(GENERATION_ID_TAG))
          {
            long generationId=-1;
            DN baseDn;
            try
            {
              // <generationId>
              generationId = new Long(str[1]);
            }
            catch (NumberFormatException e)
            {
              // should never happen
              // TODO: i18n
              throw new ReplicationDBException(Message.raw(
                  "replicationServer state database has a wrong format: " +
                  e.getLocalizedMessage()
                  + "<" + str[1] + ">"));
            }
            // <baseDn>
            baseDn = null;
            try
            {
              baseDn = DN.decode(str[2]);
            } catch (DirectoryException e)
            {
              Message message =
                ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
              logError(message);
            }
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " Has read baseDn=" + baseDn
                + " generationId=" + generationId);
            replicationServer.getReplicationCache(baseDn, true).
            setGenerationId(generationId, true);
          }
        }
        catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw("need UTF-8 support"));
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      /*
       * Get the server Id / domain base DN records
       */
      status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        String stringData = null;
        try
        {
          stringData = new String(data.getData(), "UTF-8");
        }
        catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
          "need UTF-8 support"));
        }
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " Read serverId BaseDN=" + stringData);
        String[] str = stringData.split(FIELD_SEPARATOR, 2);
        if (!str[0].equals(GENERATION_ID_TAG))
        {
          short serverId = -1;
          try
          {
            // <serverId>
            serverId = new Short(str[0]);
          } catch (NumberFormatException e)
          {
            // should never happen
            // TODO: i18n
            throw new ReplicationDBException(Message.raw(
                "replicationServer state database has a wrong format: " +
                e.getLocalizedMessage()
                + "<" + str[0] + ">"));
          }
          // <baseDn>
          DN baseDn = null;
          try
          {
@@ -134,113 +246,302 @@
          } catch (DirectoryException e)
          {
            Message message =
                ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
              ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
            logError(message);
          }
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " Has read: baseDn=" + baseDn
              + " serverId=" + serverId);
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this);
          replicationServer.getReplicationCache(baseDn).newDb(serverId,
                                                            dbHandler);
        } catch (NumberFormatException e)
        {
          // should never happen
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
              "replicationServer state database has a wrong format"));
        } catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
                  "need UTF-8 support"));
            new DbHandler(serverId, baseDn, replicationServer, this, 1);
          replicationServer.getReplicationCache(baseDn, true).
          setDbHandler(serverId, dbHandler);
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      cursor.close();
    } catch (DatabaseException dbe) {
    }
    catch (DatabaseException dbe)
    {
      cursor.close();
      throw dbe;
    }
  }
  /**
   * Find or create the database used to store changes from the server
   * with the given serverId and the given baseDn.
   * @param serverId The server id that identifies the server.
   * @param baseDn The baseDn that identifies the server.
   * @return the Database.
   * @throws DatabaseException in case of underlying Exception.
   */
  public Database getOrAddDb(Short serverId, DN baseDn)
                  throws DatabaseException
  {
    try
    /**
     * Finds or creates the database used to store changes from the server
     * with the given serverId and the given baseDn.
     *
     * @param serverId     The server id that identifies the server.
     * @param baseDn       The baseDn that identifies the domain.
     * @param generationId The generationId associated to this domain.
     * @return the Database.
     * @throws DatabaseException in case of underlying Exception.
     */
    public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
    throws DatabaseException
    {
      String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
      byte[] byteId;
      byteId = stringId.getBytes("UTF-8");
      // Open the database. Create it if it does not already exist.
      DatabaseConfig dbConfig = new DatabaseConfig();
      dbConfig.setAllowCreate(true);
      dbConfig.setTransactional(true);
      Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
      DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status == OperationStatus.NOTFOUND)
      if (debugEnabled())
        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
          serverId + " " + baseDn + " " + generationId);
      try
      {
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try {
          data.setData(byteId);
          stateDb.put(txn, key, data);
          txn.commitWriteNoSync();
        } catch (DatabaseException dbe)
        String stringId = serverId.toString() + FIELD_SEPARATOR
                          + baseDn.toNormalizedString();
        // Opens the database for the changes received from this server
        // on this domain. Create it if it does not already exist.
        DatabaseConfig dbConfig = new DatabaseConfig();
        dbConfig.setAllowCreate(true);
        dbConfig.setTransactional(true);
        Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
        // Creates the record serverId/domain base Dn in the stateDb
        // if it does not already exist.
        byte[] byteId;
        byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
        {
          // Abort the txn and propagate the Exception to the caller
          txn.abort();
          throw dbe;
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(byteId);
            if (debugEnabled())
              TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
                " serverId/Domain=<"+stringId+">");
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        // Creates the record domain base Dn/ generationId in the stateDb
        // if it does not already exist.
        stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
          baseDn.toNormalizedString();
        String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
        generationId.toString() + FIELD_SEPARATOR +
          baseDn.toNormalizedString();
        byteId = stringId.getBytes("UTF-8");
        byte[] dataByteId;
        dataByteId = dataStringId.getBytes("UTF-8");
        key = new DatabaseEntry();
        key.setData(byteId);
        data = new DatabaseEntry();
        status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(dataByteId);
            if (debugEnabled())
              TRACER.debugInfo(
                  "Created in the state Db record Tag/Domain/GenId key=" +
                  stringId + " value=" + dataStringId);
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        return db;
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
        return null;
      }
    }
    /**
     * Creates a new transaction.
     *
     * @return the transaction.
     * @throws DatabaseException in case of underlying database Exception.
     */
    public Transaction beginTransaction() throws DatabaseException
    {
      return dbEnvironment.beginTransaction(null, null);
    }
    /**
     * Shutdown the Db environment.
     */
    public void shutdown()
    {
      try
      {
        stateDb.close();
        dbEnvironment.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
    }
    /**
     * Clears the provided generationId associated to the provided baseDn
     * from the state Db.
     *
     * @param baseDn The baseDn for which the generationID must be cleared.
     *
     */
    public void clearGenerationId(DN baseDn)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
          " clearGenerationId " + baseDn);
      try
      {
        // Deletes the record domain base Dn/ generationId in the stateDb
        String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
          baseDn.toNormalizedString();
        byte[] byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if ((status == OperationStatus.SUCCESS) ||
            (status == OperationStatus.KEYEXIST))
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try
          {
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " clearGenerationId (" +
                baseDn +") succeeded.");
          }
          catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        else
        {
          // TODO : should have a better error logging
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " clearGenerationId ("+ baseDn + " failed" + status.toString());
        }
      }
      return db;
    } catch (UnsupportedEncodingException e)
    {
      // can't happen
      return null;
      catch (UnsupportedEncodingException e)
      {
        // can't happen
      }
      catch (DatabaseException dbe)
      {
        // can't happen
      }
    }
  }
  /**
   * Creates a new transaction.
   *
   * @return the transaction.
   * @throws DatabaseException in case of underlying database Exception.
   */
  public Transaction beginTransaction() throws DatabaseException
  {
    return dbEnvironment.beginTransaction(null, null);
  }
    /**
     * Clears the provided serverId associated to the provided baseDn
     * from the state Db.
     *
     * @param baseDn The baseDn for which the generationID must be cleared.
     * @param serverId The serverId to remove from the Db.
     *
     */
    public void clearServerId(DN baseDn, Short serverId)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
      try
      {
        String stringId = serverId.toString() + FIELD_SEPARATOR
                         + baseDn.toNormalizedString();
  /**
   * Shutdown the Db environment.
   */
  public void shutdown()
  {
    try
    {
      stateDb.close();
      dbEnvironment.close();
    } catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
        // Deletes the record serverId/domain base Dn in the stateDb
        byte[] byteId;
        byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status != OperationStatus.NOTFOUND)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(byteId);
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            if (debugEnabled())
              TRACER.debugInfo(
                  " In " + this.replicationServer.getMonitorInstanceName() +
                  " clearServerId() succeeded " + baseDn + " " +
                  serverId);
          }
          catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
      }
      catch (DatabaseException dbe)
      {
        // can't happen
      }
    }
  }
    /**
     * Clears the database.
     *
     * @param databaseName The name of the database to clear.
     */
    public final void clearDb(String databaseName)
    {
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              "clearDb" + databaseName);
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        dbEnvironment.truncateDatabase(txn, databaseName, false);
      }
      catch (DatabaseException dbe)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
            dbe.getLocalizedMessage()));
        logError(mb.toMessage());
      }
    }
}