mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.22.2013 45a8024fe68e7bc451a5a22afcaf31e7edb745a1
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -27,6 +27,19 @@
 */
package org.opends.server.replication.server;
import java.io.Closeable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -35,16 +48,6 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
 * and the dbHandler class.
@@ -53,6 +56,8 @@
public class DraftCNDB
{
  private static final DebugTracer TRACER = getTracer();
  private static final int DATABASE_EMPTY = 0;
  private Database db = null;
  private ReplicationDbEnv dbenv = null;
  private ReplicationServer replicationServer;
@@ -61,27 +66,23 @@
   * The lock used to provide exclusive access to the thread that close the db
   * (shutdown or clear).
   */
  private ReentrantReadWriteLock dbCloseLock;
  private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
  /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
   * @param replicationServer The ReplicationServer that needs to be shutdown.
   * @param dbenv The Db environment to use to create the db.
   * @throws DatabaseException If a database problem happened.
   * @throws ChangelogException If a database problem happened.
   */
  public DraftCNDB(
      ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
  throws DatabaseException
  public DraftCNDB(ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      throws ChangelogException
  {
    this.dbenv = dbenv;
    this.replicationServer = replicationServer;
    // Get or create the associated ReplicationServerDomain and Db.
    db = dbenv.getOrCreateDraftCNDb();
    dbCloseLock = new ReentrantReadWriteLock(true);
  }
  /**
@@ -101,8 +102,7 @@
    try
    {
      DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
      DatabaseEntry data = new DraftCNData(
          value, domainBaseDN, changeNumber);
      DatabaseEntry data = new DraftCNData(value, domainBaseDN, changeNumber);
      // Use a transaction so that we can override durability.
      Transaction txn = null;
@@ -121,24 +121,40 @@
      }
      finally
      {
        if (txn != null)
        {
          // No effect if txn has committed.
          try
          {
            txn.abort();
          }
          catch (Exception e)
          {
            // Ignored.
          }
        }
        abort(txn);
        dbCloseLock.readLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
    }
    catch (ChangelogException e)
    {
      replicationServer.handleUnexpectedChangelogException(e);
    }
  }
  /**
   * Aborts the current transaction. It has no effect if the transaction has
   * committed.
   *
   * @param txn
   *          the transaction to abort
   */
  private static void abort(Transaction txn)
  {
    if (txn != null)
    {
      try
      {
        txn.abort();
      }
      catch (DatabaseException ignored)
      {
        // Ignore.
        TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
      }
    }
  }
@@ -147,18 +163,11 @@
   */
  public void shutdown()
  {
    dbCloseLock.writeLock().lock();
    try
    {
      dbCloseLock.writeLock().lock();
      try
      {
        db.close();
        db = null;
      }
      finally
      {
        dbCloseLock.writeLock().unlock();
      }
      db.close();
      db = null;
    }
    catch (DatabaseException e)
    {
@@ -167,19 +176,21 @@
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
    }
    finally
    {
      dbCloseLock.writeLock().unlock();
    }
  }
  /**
   * Create a cursor that can be used to search or iterate on this DB.
   *
   * @param draftCN The draftCN from which the cursor must start.
   * @throws DatabaseException If a database error prevented the cursor
   * @throws ChangelogException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   * @return The ReplServerDBCursor.
   */
  public DraftCNDBCursor openReadCursor(int draftCN)
  throws DatabaseException, Exception
  public DraftCNDBCursor openReadCursor(int draftCN) throws ChangelogException
  {
    return new DraftCNDBCursor(draftCN);
  }
@@ -188,14 +199,11 @@
   * Create a cursor that can be used to delete some record from this
   * ReplicationServer database.
   *
   * @throws DatabaseException If a database error prevented the cursor
   * @throws ChangelogException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   *
   * @return The ReplServerDBCursor.
   */
  public DraftCNDBCursor openDeleteCursor()
  throws DatabaseException, Exception
  public DraftCNDBCursor openDeleteCursor() throws ChangelogException
  {
    return new DraftCNDBCursor();
  }
@@ -235,11 +243,10 @@
        DatabaseEntry entry = new DatabaseEntry();
        if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS)
        {
          /* database is empty */
          return 0;
          return DATABASE_EMPTY;
        }
        return new Integer(decodeUTF8(key.getData()));
        return Integer.parseInt(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -248,8 +255,7 @@
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -305,11 +311,10 @@
        DatabaseEntry entry = new DatabaseEntry();
        if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS)
        {
          /* database is empty */
          return 0;
          return DATABASE_EMPTY;
        }
        return new Integer(decodeUTF8(key.getData()));
        return Integer.parseInt(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -318,11 +323,17 @@
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
  private void handleUnexpectedDatabaseException(DatabaseException e)
  {
    ChangelogException ex = new ChangelogException(e);
    replicationServer.handleUnexpectedChangelogException(ex);
  }
  /**
   * {@inheritDoc}
   */
@@ -357,10 +368,10 @@
     *
     * @param startingDraftCN
     *          the draftCN from which the cursor must start.
     * @throws Exception
     * @throws ChangelogException
     *           when the startingDraftCN does not exist.
     */
    private DraftCNDBCursor(int startingDraftCN) throws Exception
    private DraftCNDBCursor(int startingDraftCN) throws ChangelogException
    {
      this.key = new ReplicationDraftCNKey(startingDraftCN);
      this.entry = new DatabaseEntry();
@@ -391,8 +402,9 @@
            if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              throw new Exception("ChangeLog Draft Change Number "
                  + startingDraftCN + " is not available");
              throw new ChangelogException(
                  Message.raw("ChangeLog Draft Change Number " + startingDraftCN
                      + " is not available"));
            }
            if (localCursor.getPrev(key, entry, LockMode.DEFAULT) != SUCCESS)
@@ -414,7 +426,13 @@
        this.txn = null;
        this.cursor = localCursor;
      }
      catch (Exception e)
      catch (DatabaseException e)
      {
        // Unlocking is required before throwing any exception
        closeLockedCursor(localCursor);
        throw new ChangelogException(e);
      }
      catch (ChangelogException e)
      {
        // Unlocking is required before throwing any exception
        closeLockedCursor(localCursor);
@@ -424,7 +442,7 @@
    private DraftCNDBCursor() throws Exception
    private DraftCNDBCursor() throws ChangelogException
    {
      Transaction localTxn = null;
      Cursor localCursor = null;
@@ -453,32 +471,20 @@
        this.txn = localTxn;
        this.cursor = localCursor;
      }
      catch (Exception e)
      catch (DatabaseException e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (DatabaseException ignored)
        {
          // Ignore.
          TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
        }
        closeLockedCursor(localCursor);
        DraftCNDB.abort(localTxn);
        throw new ChangelogException(e);
      }
      catch (ChangelogException e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        if (localTxn != null)
        {
          try
          {
            localTxn.abort();
          }
          catch (DatabaseException ignored)
          {
            // Ignore.
            TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
          }
        }
        closeLockedCursor(localCursor);
        DraftCNDB.abort(localTxn);
        throw e;
      }
    }
@@ -508,7 +514,7 @@
        }
        catch (DatabaseException e)
        {
          replicationServer.handleUnexpectedDatabaseException(e);
          handleUnexpectedDatabaseException(e);
        }
      }
    }
@@ -541,7 +547,7 @@
        }
        catch (DatabaseException e)
        {
          replicationServer.handleUnexpectedDatabaseException(e);
          handleUnexpectedDatabaseException(e);
        }
      }
    }
@@ -593,7 +599,6 @@
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      return null;
    }
@@ -619,7 +624,6 @@
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      return -1;
    }
@@ -651,22 +655,22 @@
    /**
     * Go to the next record on the cursor.
     * @return the next record on this cursor.
     * @throws DatabaseException a.
     * @throws ChangelogException a.
     */
    public boolean next() throws DatabaseException
    public boolean next() throws ChangelogException
    {
      if (isClosed)
      {
        return false;
      }
      OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        seqnumData = null;
        return false;
      }
      try {
        OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          seqnumData = null;
          return false;
        }
        seqnumData = new DraftCNData(entry.getData());
      }
      catch(Exception e)
@@ -679,16 +683,23 @@
    /**
     * Delete the record at the current cursor position.
     *
     * @throws DatabaseException In case of database problem.
     * @throws ChangelogException In case of database problem.
     */
    public void delete() throws DatabaseException
    public void delete() throws ChangelogException
    {
      if (isClosed)
      {
        throw new IllegalStateException("DraftCNDB already closed");
      }
      cursor.delete();
      try
      {
        cursor.delete();
      }
      catch (DatabaseException e)
      {
        throw new ChangelogException(e);
      }
    }
    /**
@@ -710,10 +721,9 @@
  /**
   * Clears this change DB from the changes it contains.
   *
   * @throws Exception Throws an exception it occurs.
   * @throws DatabaseException Throws a DatabaseException when it occurs.
   * @throws ChangelogException Throws a DatabaseException when it occurs.
   */
  public void clear() throws Exception, DatabaseException
  public void clear() throws ChangelogException
  {
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();