mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
19.33.2008 111c22a17e3ab4bdca9052891a810f5ab7cea6b6
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -37,6 +37,7 @@
import org.opends.server.types.DN;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -63,7 +64,11 @@
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
 /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
   * @param serverId The identifier of the LDAP server.
@@ -86,6 +91,8 @@
    db = dbenv.getOrAddDb(serverId, baseDn,
        replicationServer.getReplicationServerDomain(baseDn,
        true).getGenerationId());
    dbCloseLock = new ReentrantReadWriteLock(true);
  }
  /**
@@ -108,6 +115,7 @@
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
@@ -128,6 +136,10 @@
          txn.abort();
          txn = null;
        }
        finally
        {
          dbCloseLock.readLock().unlock();
        }
      }
      if (!done)
      {
@@ -190,8 +202,17 @@
  {
    try
    {
      db.close();
    } catch (DatabaseException e)
      dbCloseLock.writeLock().lock();
      try
      {
        db.close();
      }
      finally
      {
        dbCloseLock.writeLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString()));
@@ -223,6 +244,7 @@
   * @throws DatabaseException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor()
@@ -242,10 +264,13 @@
    try
    {
        cursor = db.openCursor(null, null);
    } catch (DatabaseException e1)
      dbCloseLock.readLock().lock();
      cursor = db.openCursor(null, null);
    }
    catch (DatabaseException e1)
    {
        return null;
      dbCloseLock.readLock().unlock();
      return null;
    }
    try
    {
@@ -253,6 +278,7 @@
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      cursor.close();
      dbCloseLock.readLock().unlock();
      if (status != OperationStatus.SUCCESS)
      {
        /* database is empty */
@@ -268,11 +294,14 @@
      return new ChangeNumber(str);
    } catch (DatabaseException e)
    {
      try {
      cursor.close();
      try
      {
        cursor.close();
        dbCloseLock.readLock().unlock();
      }
      catch (DatabaseException dbe)
      {
        // The db is dead - let's only log.
      }
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
@@ -295,11 +324,13 @@
    try
    {
      dbCloseLock.readLock().lock();
      cursor = db.openCursor(null, null);
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
      cursor.close();
      dbCloseLock.readLock().unlock();
      if (status != OperationStatus.SUCCESS)
      {
        /* database is empty */
@@ -355,10 +386,14 @@
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
            throws Exception
    {
      cursor = db.openCursor(txn, null);
      try
      {
        // Take the lock. From now on, whatever error that happen in the life
        // of this cursor should end by unlocking that lock. We must also
        // unlock it when throwing an exception.
        dbCloseLock.readLock().lock();
        cursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        {
          key = new ReplicationKey(startingChangeNumber);
@@ -367,20 +402,35 @@
          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              // Unlocking is required before throwing any exception
              dbCloseLock.readLock().unlock();
              throw new Exception("ChangeNumber not available");
            }
            else
            {
              // We can move close to the startingChangeNumber.
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                cursor.close();
                cursor = db.openCursor(txn, null);
                try
                {
                  cursor.close();
                  cursor = db.openCursor(txn, null);
                }
                catch(Exception e)
                {
                  // Unlocking is required before throwing any exception
                  dbCloseLock.readLock().unlock();
                  throw(e);
                }
              }
            }
          }
@@ -388,6 +438,8 @@
      }
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        dbCloseLock.readLock().unlock();
        cursor.close();
        throw (e);
      }
@@ -395,8 +447,18 @@
    private ReplServerDBCursor() throws DatabaseException
    {
      txn = dbenv.beginTransaction();
      cursor = db.openCursor(txn, null);
      try
      {
        // We'll go on only if no close or no clear is running
        dbCloseLock.readLock().lock();
        txn = dbenv.beginTransaction();
        cursor = db.openCursor(txn, null);
      }
      catch(DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        throw (e);
      }
    }
    /**
@@ -414,12 +476,15 @@
      }
      catch (DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
      if (txn != null)
      {
        try
@@ -434,6 +499,7 @@
          replicationServer.shutdown();
        }
      }
      dbCloseLock.readLock().unlock();
    }
    /**
@@ -460,6 +526,8 @@
      }
      catch (DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
@@ -480,6 +548,7 @@
          replicationServer.shutdown();
        }
      }
      dbCloseLock.readLock().unlock();
    }
    /**
@@ -556,7 +625,7 @@
    {
      cursor.delete();
    }
  }
  } // ReplServerDBCursor
  /**
   * Clears this change DB from the changes it contains.
@@ -566,10 +635,38 @@
   */
  public void clear() throws Exception, DatabaseException
  {
    // Clears the changes
    dbenv.clearDb(this.toString());
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
    try
    {
      String dbName = db.getDatabaseName();
    // Clears the reference to this serverID
    dbenv.clearServerId(baseDn, serverId);
      // Clears the reference to this serverID
      dbenv.clearServerId(baseDn, serverId);
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      // Clears the changes
      dbenv.clearDb(dbName);
      db = null;
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(),
          e.getMessage() + " " +
          stackTraceToSingleLineString(e)));
      logError(mb.toMessage());
    }
    finally
    {
      // Relax the waiting users
      dbCloseLock.writeLock().unlock();
    }
  }
}