mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.33.2011 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -32,7 +32,6 @@
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.MessageBuilder;
@@ -40,14 +39,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
@@ -61,9 +53,6 @@
  private ReplicationDbEnv dbenv = null;
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
@@ -103,71 +92,41 @@
  public void addEntry(int draftCN, String value, String domainBaseDN,
      ChangeNumber changeNumber)
  {
    Transaction txn = null;
    try
    {
      int tries = 0;
      boolean done = false;
      DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
      DatabaseEntry data = new DraftCNData(draftCN,
          value, domainBaseDN, changeNumber);
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      // Use a transaction so that we can override durability.
      Transaction txn = null;
      dbCloseLock.readLock().lock();
      try
      {
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
          DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
          DatabaseEntry data = new DraftCNData(draftCN,
              value, domainBaseDN, changeNumber);
          db.put(txn, key, data);
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (LockConflictException e)
        {
          // Try again.
        }
        finally
        {
          if (txn != null)
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
          }
          dbCloseLock.readLock().unlock();
        }
        txn = dbenv.beginTransaction();
        db.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
      if (!done)
      finally
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        replicationServer.shutdown();
        if (txn != null)
        {
          // No effect if txn has committed.
          try
          {
            txn.abort();
          }
          catch (Exception e)
          {
            // Ignored.
          }
        }
        dbCloseLock.readLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
    }
  }
@@ -229,12 +188,20 @@
  }
  private void closeLockedCursor(Cursor cursor)
  throws DatabaseException
  {
    try
    {
      if (cursor != null)
        cursor.close();
      {
        try
        {
          cursor.close();
        }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
    }
    finally
    {
@@ -248,12 +215,10 @@
   */
  public int readFirstDraftCN()
  {
    Cursor cursor = null;
    String str = null;
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      {
        cursor = db.openCursor(null, null);
@@ -265,9 +230,8 @@
          /* database is empty */
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
        return new Integer(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -277,20 +241,7 @@
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return 0;
    }
    catch (Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -318,12 +269,10 @@
   */
  public int readLastDraftCN()
  {
    Cursor cursor = null;
    String str = null;
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      {
        cursor = db.openCursor(null, null);
@@ -335,9 +284,8 @@
          /* database is empty */
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
        return new Integer(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -346,16 +294,7 @@
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return 0;
    }
    catch (Exception e)
    {
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -519,20 +458,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
@@ -540,20 +466,11 @@
        {
          txn.commit();
        }
        catch (Exception e)
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -574,26 +491,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
@@ -601,20 +499,11 @@
        {
          txn.abort();
        }
        catch (Exception e)
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**