mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
01.40.2008 7e6c6657bced35f4a3aba723c2add20923450ad6
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -42,6 +42,7 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.je.DeadlockException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -59,6 +60,9 @@
  private Short serverId;
  private DN baseDn;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
@@ -95,28 +99,49 @@
    try
    {
      txn = dbenv.beginTransaction();
      int tries = 0;
      boolean done = false;
      for (UpdateMessage change : changes)
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
        DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
        DatabaseEntry data = new ReplicationData(change);
        try
        {
          db.put(txn, key, data);
        } catch (DatabaseException e)
          txn = dbenv.beginTransaction();
          for (UpdateMessage change : changes)
          {
            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
            DatabaseEntry data = new ReplicationData(change);
            db.put(txn, key, data);
          }
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (DeadlockException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          txn.abort();
          txn = null;
        }
      }
      txn.commitWriteNoSync();
      txn = null;
      if (!done)
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        if (txn != null)
        {
           txn.abort();
        }
        replicationServer.shutdown();
      }
    }
    catch (DatabaseException e)
    {
@@ -332,31 +357,40 @@
    {
      cursor = db.openCursor(txn, null);
      if (startingChangeNumber != null)
      try
      {
        key = new ReplicationKey(startingChangeNumber);
        data = new DatabaseEntry();
        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
          OperationStatus.SUCCESS)
        if (startingChangeNumber != null)
        {
          if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
          key = new ReplicationKey(startingChangeNumber);
          data = new DatabaseEntry();
          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          {
            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              throw new Exception("ChangeNumber not available");
          }
          else
          {
            DatabaseEntry key = new DatabaseEntry();
            DatabaseEntry data = new DatabaseEntry();
            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
             OperationStatus.SUCCESS)
           {
             cursor = db.openCursor(txn, null);
           }
            }
            else
            {
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                cursor.close();
                cursor = db.openCursor(txn, null);
              }
            }
          }
        }
      }
      catch (Exception e)
      {
        cursor.close();
        throw (e);
      }
    }
    private ReplServerDBCursor() throws DatabaseException
@@ -370,13 +404,15 @@
     */
    public void close()
    {
      if (cursor == null)
        return;
      try
      {
        cursor.close();
        cursor = null;
      } catch (DatabaseException e)
        if (cursor != null)
        {
          cursor.close();
          cursor = null;
        }
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
@@ -401,6 +437,52 @@
    }
    /**
     * Abort the Cursor after a Deadlock Exception.
     * This method catch and ignore the DeadlockException because
     * this must be done when aborting a cursor after a DeadlockException
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
    public void abort()
    {
      if (cursor == null)
        return;
      try
      {
        cursor.close();
        cursor = null;
      }
      catch (DeadlockException e1)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
        }
      }
    }
    /**
     * Get the next ChangeNumber in the database from this Cursor.
     *
     * @return The next ChangeNumber in the database from this cursor.