mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.22.2013 45a8024fe68e7bc451a5a22afcaf31e7edb745a1
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -27,26 +27,28 @@
 */
package org.opends.server.replication.server;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.util.StaticUtils;
import com.sleepycat.je.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class implements the interface between the underlying database
 * and the dbHandler class.
@@ -67,8 +69,7 @@
   * The lock used to provide exclusive access to the thread that close the db
   * (shutdown or clear).
   */
  private final ReentrantReadWriteLock dbCloseLock =
      new ReentrantReadWriteLock(true);
  private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
  // Change counter management
  // The Db itself does not allow to count records between a start and an end
@@ -117,12 +118,12 @@
   * @param baseDn The baseDn of the replication domain.
   * @param replicationServer The ReplicationServer that needs to be shutdown.
   * @param dbenv The Db environment to use to create the db.
   * @throws DatabaseException If a database problem happened.
   * @throws ChangelogException If a database problem happened.
   */
  public ReplicationDB(int serverId, String baseDn,
                     ReplicationServer replicationServer,
                     ReplicationDbEnv dbenv)
                     throws DatabaseException
                     throws ChangelogException
  {
    this.serverId = serverId;
    this.baseDn = baseDn;
@@ -138,13 +139,15 @@
    intializeCounters();
  }
  private void intializeCounters()
  private void intializeCounters() throws ChangelogException
  {
    this.counterCurrValue = 1;
    Cursor cursor = db.openCursor(null, null);
    Cursor cursor = null;
    try
    {
      cursor = db.openCursor(null, null);
      int distBackToCounterRecord = 0;
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
@@ -164,9 +167,13 @@
      }
      counterCurrValue += distBackToCounterRecord;
    }
    catch (DatabaseException e)
    {
      throw new ChangelogException(e);
    }
    finally
    {
      cursor.close();
      close(cursor);
    }
  }
@@ -205,7 +212,7 @@
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
    }
    finally
    {
@@ -213,7 +220,14 @@
    }
  }
  private void handleUnexpectedDatabaseException(DatabaseException e)
  {
    ChangelogException ex = new ChangelogException(e);
    replicationServer.handleUnexpectedChangelogException(ex);
  }
  private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
      throws DatabaseException
  {
    if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
    {
@@ -276,13 +290,12 @@
   * ReplicationServer DB.
   *
   * @param changeNumber The ChangeNumber from which the cursor must start.
   * @throws DatabaseException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   * @throws ChangelogException
   *           When a problem occurs or the startingChangeNumber does not exist.
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
                throws DatabaseException, Exception
      throws ChangelogException
  {
    return new ReplServerDBCursor(changeNumber);
  }
@@ -291,21 +304,19 @@
   * Create a cursor that can be used to delete some record from this
   * ReplicationServer database.
   *
   * @throws DatabaseException If a database error prevented the cursor
   * @throws ChangelogException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor()
                throws DatabaseException, Exception
  public ReplServerDBCursor openDeleteCursor() throws ChangelogException
  {
    return new ReplServerDBCursor();
  }
  private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException
  private void closeAndReleaseReadLock(Cursor cursor)
  {
    try
    {
@@ -362,7 +373,7 @@
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
      return null;
    }
    finally
@@ -421,7 +432,7 @@
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
      return null;
    }
    finally
@@ -482,7 +493,7 @@
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
    }
    finally
    {
@@ -492,7 +503,7 @@
  }
  private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
      DatabaseEntry data)
      DatabaseEntry data) throws DatabaseException
  {
    final ChangeNumber cn = toChangeNumber(key.getData());
    if (!isACounterRecord(cn))
@@ -548,11 +559,11 @@
     *
     * @param startingChangeNumber
     *          The ChangeNumber from which the cursor must start.
     * @throws Exception
     * @throws ChangelogException
     *           When the startingChangeNumber does not exist.
     */
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
        throws Exception
        throws ChangelogException
    {
      if (startingChangeNumber != null)
      {
@@ -591,7 +602,8 @@
            if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              throw new Exception("ChangeNumber not available");
              throw new ChangelogException(
                  Message.raw("ChangeNumber not available"));
            }
            // We can move close to the startingChangeNumber.
@@ -607,15 +619,21 @@
        }
        cursor = localCursor;
      }
      catch (Exception e)
      catch (ChangelogException e)
      {
        // Unlocking is required before throwing any exception
        closeAndReleaseReadLock(localCursor);
        throw e;
      }
      catch (DatabaseException e)
      {
        // Unlocking is required before throwing any exception
        closeAndReleaseReadLock(localCursor);
        throw new ChangelogException(e);
      }
    }
    private ReplServerDBCursor() throws Exception
    private ReplServerDBCursor() throws ChangelogException
    {
      key = new DatabaseEntry();
      data = new DatabaseEntry();
@@ -644,22 +662,32 @@
        txn = localTxn;
        cursor = localCursor;
      }
      catch (ChangelogException e)
      {
        closeAndReleaseReadLock(localCursor);
        abort(localTxn);
        throw e;
      }
      catch (Exception e)
      {
        closeAndReleaseReadLock(localCursor);
        abort(localTxn);
        throw new ChangelogException(e);
      }
    }
        if (localTxn != null)
    private void abort(Transaction localTxn)
    {
      if (localTxn != null)
      {
        try
        {
          try
          {
            localTxn.abort();
          }
          catch (DatabaseException ignore)
          {
            // Ignore.
          }
          localTxn.abort();
        }
        throw e;
        catch (DatabaseException ignore)
        {
          // Ignore.
        }
      }
    }
@@ -689,7 +717,7 @@
        }
        catch (DatabaseException e)
        {
          replicationServer.handleUnexpectedDatabaseException(e);
          handleUnexpectedDatabaseException(e);
        }
      }
    }
@@ -722,7 +750,7 @@
        }
        catch (DatabaseException e)
        {
          replicationServer.handleUnexpectedDatabaseException(e);
          handleUnexpectedDatabaseException(e);
        }
      }
    }
@@ -731,20 +759,27 @@
     * Get the next ChangeNumber in the database from this Cursor.
     *
     * @return The next ChangeNumber in the database from this cursor.
     * @throws DatabaseException In case of underlying database problem.
     * @throws ChangelogException In case of underlying database problem.
     */
    public ChangeNumber nextChangeNumber() throws DatabaseException
    public ChangeNumber nextChangeNumber() throws ChangelogException
    {
      if (isClosed)
      {
        return null;
      }
      if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
      try
      {
        return null;
        if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
        {
          return null;
        }
        return toChangeNumber(key.getData());
      }
      return toChangeNumber(key.getData());
      catch (DatabaseException e)
      {
        throw new ChangelogException(e);
      }
    }
    /**
@@ -807,26 +842,32 @@
    /**
     * Delete the record at the current cursor position.
     *
     * @throws DatabaseException In case of database problem.
     * @throws ChangelogException In case of database problem.
     */
    public void delete() throws DatabaseException
    public void delete() throws ChangelogException
    {
      if (isClosed)
      {
        throw new IllegalStateException("ReplServerDBCursor already closed");
      }
      cursor.delete();
      try
      {
        cursor.delete();
      }
      catch (DatabaseException e)
      {
        throw new ChangelogException(e);
      }
    }
  } // ReplServerDBCursor
  }
  /**
   * Clears this change DB from the changes it contains.
   *
   * @throws Exception Throws an exception it occurs.
   * @throws DatabaseException Throws a DatabaseException when it occurs.
   * @throws ChangelogException In case of database problem.
   */
  public void clear() throws Exception, DatabaseException
  public void clear() throws ChangelogException
  {
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
@@ -915,7 +956,7 @@
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      handleUnexpectedDatabaseException(e);
    }
    finally
    {
@@ -927,6 +968,7 @@
  private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
      ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
      throws DatabaseException
  {
    Cursor cursor = db.openCursor(null, null);
    try
@@ -981,6 +1023,7 @@
  private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
      ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
      throws DatabaseException
  {
    Cursor cursor = db.openCursor(null, null);
    try