mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.33.2011 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
import java.io.UnsupportedEncodingException;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
@@ -64,9 +56,6 @@
  private int serverId;
  private String baseDn;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
@@ -180,98 +169,48 @@
   */
  public void addEntries(List<UpdateMsg> changes)
  {
    Transaction txn = null;
    try
    {
      int tries = 0;
      boolean done = false;
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      dbCloseLock.readLock().lock();
      try
      {
        dbCloseLock.readLock().lock();
        try
        for (UpdateMsg change : changes)
        {
          txn = dbenv.beginTransaction();
          DatabaseEntry key = new ReplicationKey(
              change.getChangeNumber());
          DatabaseEntry data = new ReplicationData(change);
          for (UpdateMsg change : changes)
          if ((counterCurrValue != 0)
              && (counterCurrValue % counterWindowSize == 0))
          {
            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
            DatabaseEntry data = new ReplicationData(change);
            if ((counterCurrValue!=0) &&
                (counterCurrValue%counterWindowSize == 0))
            {
              // enough changes to generate a counter record - wait for the next
              // change fo time
              counterTsLimit = change.getChangeNumber().getTime();
            }
            if ((counterTsLimit!=0)
                && (change.getChangeNumber().getTime() != counterTsLimit))
            {
              // Write the counter record
              DatabaseEntry counterKey = new ReplicationKey(
                  new ChangeNumber(
                  change.getChangeNumber().getTime(),
                  0, 0));
              DatabaseEntry counterValue =
                encodeCounterValue(counterCurrValue-1);
              db.put(txn, counterKey, counterValue);
              counterTsLimit=0;
            }
            db.put(txn, key, data);
            counterCurrValue++;
            // enough changes to generate a counter record - wait for the next
            // change of time
            counterTsLimit = change.getChangeNumber().getTime();
          }
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (LockConflictException e)
        {
          // Try again.
        }
        finally
        {
          if (txn != null)
          if ((counterTsLimit != 0)
              && (change.getChangeNumber().getTime() != counterTsLimit))
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
            // Write the counter record
            DatabaseEntry counterKey = new ReplicationKey(
                new ChangeNumber(change.getChangeNumber().getTime(),
                    0, 0));
            DatabaseEntry counterValue =
              encodeCounterValue(counterCurrValue - 1);
            db.put(null, counterKey, counterValue);
            counterTsLimit = 0;
          }
          dbCloseLock.readLock().unlock();
          db.put(null, key, data);
          counterCurrValue++;
        }
      }
      if (!done)
      finally
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        replicationServer.shutdown();
        dbCloseLock.readLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
    }
  }
@@ -334,13 +273,24 @@
    return new ReplServerDBCursor();
  }
  private void closeLockedCursor(Cursor cursor)
    throws DatabaseException
      throws DatabaseException
  {
    try
    {
      if (cursor != null)
        cursor.close();
      {
        try
        {
          cursor.close();
        }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
    }
    finally
    {
@@ -358,53 +308,53 @@
    String str = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      dbCloseLock.readLock().lock();
      try
      {
        /* database is empty */
        return null;
      }
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
      str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        // First record is a counter record .. go next
        status = cursor.getNext(key, data, LockMode.DEFAULT);
        cursor = db.openCursor(null, null);
        OperationStatus status = cursor.getFirst(key, data,
            LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          // DB contains only a counter record
          /* database is empty */
          return null;
        }
        else
        str = decodeUTF8(key.getData());
        cn = new ChangeNumber(str);
        if (ReplicationDB.isaCounter(cn))
        {
          cn = new ChangeNumber(decodeUTF8(key.getData()));
          // First record is a counter record .. go next
          status = cursor.getNext(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          {
            // DB contains only a counter record
            return null;
          }
          else
          {
            cn = new ChangeNumber(decodeUTF8(key.getData()));
          }
        }
      }
      finally
      {
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -420,57 +370,56 @@
    Cursor cursor = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getLast(key, data,
          LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      dbCloseLock.readLock().lock();
      try
      {
        /* database is empty */
        return null;
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        OperationStatus status = cursor.getLast(key, data,
            LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          return null;
        }
        String str = decodeUTF8(key.getData());
        cn = new ChangeNumber(str);
        if (ReplicationDB.isaCounter(cn))
        {
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              != OperationStatus.SUCCESS)
          {
            /*
             * database only contain a counter record - don't know how much it
             * can be possible but ...
             */
            cn = null;
          }
          else
          {
            str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            // There can't be 2 counter record next to each other
          }
        }
      }
      String str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      finally
      {
        if (cursor.getPrev(key, data,
            LockMode.DEFAULT) != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
           */
          cn = null;
        }
        else
        {
          str = decodeUTF8(key.getData());
          cn= new ChangeNumber(str);
          // There can't be 2 counter record next to each other
        }
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -496,85 +445,80 @@
    DatabaseEntry key = new ReplicationKey(changeNumber);
    DatabaseEntry data = new DatabaseEntry();
    Transaction txn = null;
    dbCloseLock.readLock().lock();
    try
    {
      cursor = db.openCursor(txn, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
              OperationStatus.SUCCESS)
      dbCloseLock.readLock().lock();
      try
      {
        // We can move close to the changeNumber.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
        cursor = db.openCursor(null, null);
        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
            == OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          // We can move close to the changeNumber.
          // Let's move to the previous change.
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            if (cursor.getPrev(key, data,
                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
            String str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            if (ReplicationDB.isaCounter(cn))
            {
              // database starts with a counter record.
              cn = null;
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
              {
                // database starts with a counter record.
                cn = null;
              }
              else
              {
                str = decodeUTF8(key.getData());
                cn = new ChangeNumber(str);
                // There can't be 2 counter record next to each other
              }
            }
            else
          }
          // else, there was no change previous to our changeNumber.
        }
        else
        {
          // We could not move the cursor past to the changeNumber
          // Check if the last change is older than changeNumber
          if (cursor.getLast(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            String str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            if (ReplicationDB.isaCounter(cn))
            {
              str = decodeUTF8(key.getData());
              cn= new ChangeNumber(str);
              // There can't be 2 counter record next to each other
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
              {
                /*
                 * database only contain a counter record, should not be
                 * possible, but Ok, let's just say no change Number
                 */
                cn = null;
              }
              else
              {
                str = decodeUTF8(key.getData());
                cn = new ChangeNumber(str);
                // There can't be 2 counter record next to each other
              }
            }
          }
        }
        // else, there was no change previous to our changeNumber.
      }
      else
      finally
      {
        // We could not move the cursor past to the changeNumber
        // Check if the last change is older than changeNumber
        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data,
              LockMode.DEFAULT) != OperationStatus.SUCCESS)
            {
              /*
               * database only contain a counter record, should not be
               * possible, but Ok, let's just say no change Number
               */
              cn = null;
            }
            else
            {
              str = decodeUTF8(key.getData());
              cn= new ChangeNumber(str);
              // There can't be 2 counter record next to each other
            }
          }
        }
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      // TODO: Verify if shutting down replication is the right thing to do
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -669,14 +613,7 @@
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        closeLockedCursor(localCursor);
        throw e;
      }
    }
@@ -703,14 +640,7 @@
      }
      catch (Exception e)
      {
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        closeLockedCursor(localCursor);
        if (localTxn != null)
        {
@@ -741,41 +671,19 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
        try
        {
          txn.commit();
          // No need for durability when purging.
          txn.commit(Durability.COMMIT_NO_SYNC);
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -796,26 +704,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
@@ -825,18 +714,9 @@
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -972,143 +852,140 @@
    int distToCounterRecord1 = 0;
    int distBackToCounterRecord2 = 0;
    int count=0;
    Cursor cursor = null;
    Transaction txn = null;
    OperationStatus status;
    try
    {
      ChangeNumber cn ;
      if ((start==null)&&(stop==null))
        return (int)db.count();
      // Step 1 : from the start point, traverse db to the next counter record
      // or to the stop point.
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(txn, null);
      if (start != null)
      Cursor cursor = null;
      try
      {
        key = new ReplicationKey(start);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
      }
      else
      {
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
        ChangeNumber cn ;
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        String csnString = decodeUTF8(key.getData());
        cn = new ChangeNumber(csnString);
        if (cn.getServerId() != 0)
        if ((start==null)&&(stop==null))
          return (int)db.count();
        // Step 1 : from the start point, traverse db to the next counter record
        // or to the stop point.
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        if (start != null)
        {
          // reached a regular change record
          // test whether we reached the 'stop' target
          if (!cn.newer(stop))
          {
            // let's loop
            distToCounterRecord1++;
            status = cursor.getNext(key, data, LockMode.DEFAULT);
          }
          else
          {
            // reached the end
            break;
          }
          key = new ReplicationKey(start);
          status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
          if (status == OperationStatus.NOTFOUND)
            status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
        }
        else
        {
          // counter record
          counterRecord1 = decodeCounterValue(data.getData());
          break;
          status = cursor.getNext(key, data, LockMode.DEFAULT);
        }
      }
      cursor.close();
      // cases
      //
      if (counterRecord1==0)
        return distToCounterRecord1;
        while (status == OperationStatus.SUCCESS)
        {
          // test whether the record is a regular change or a counter
          String csnString = decodeUTF8(key.getData());
          cn = new ChangeNumber(csnString);
          if (cn.getServerId() != 0)
          {
            // reached a regular change record
            // test whether we reached the 'stop' target
            if (!cn.newer(stop))
            {
              // let's loop
              distToCounterRecord1++;
              status = cursor.getNext(key, data, LockMode.DEFAULT);
            }
            else
            {
              // reached the end
              break;
            }
          }
          else
          {
            // counter record
            counterRecord1 = decodeCounterValue(data.getData());
            break;
          }
        }
        cursor.close();
      // Step 2 : from the stop point, traverse db to the next counter record
      // or to the start point.
      txn = null;
      data = new DatabaseEntry();
      key = new ReplicationKey(stop);
      cursor = db.openCursor(txn, null);
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(decodeUTF8(key.getData()));
      }
      else
      {
        key = new DatabaseEntry();
        // cases
        //
        if (counterRecord1==0)
          return distToCounterRecord1;
        // Step 2 : from the stop point, traverse db to the next counter record
        // or to the start point.
        data = new DatabaseEntry();
        status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        key = new ReplicationKey(stop);
        cursor = db.openCursor(null, null);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.SUCCESS)
        {
          /* database is empty */
          return 0;
          cn = new ChangeNumber(decodeUTF8(key.getData()));
        }
      }
      while (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        else
        {
          // regular change record
          if (!cn.older(start))
          key = new DatabaseEntry();
          data = new DatabaseEntry();
          status = cursor.getLast(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          {
            distBackToCounterRecord2++;
            status = cursor.getPrev(key, data, LockMode.DEFAULT);
            /* database is empty */
            return 0;
          }
        }
        while (status == OperationStatus.SUCCESS)
        {
          cn = new ChangeNumber(decodeUTF8(key.getData()));
          if (!ReplicationDB.isaCounter(cn))
          {
            // regular change record
            if (!cn.older(start))
            {
              distBackToCounterRecord2++;
              status = cursor.getPrev(key, data, LockMode.DEFAULT);
            }
            else
              break;
          }
          else
          {
            // counter record
            counterRecord2 = decodeCounterValue(data.getData());
            break;
          }
        }
        else
        cursor.close();
        // Step 3 : Now consolidates the result
        if (counterRecord1!=0)
        {
          // counter record
          counterRecord2 = decodeCounterValue(data.getData());
          break;
          if (counterRecord1 == counterRecord2)
          {
            // only one cp between from and to - no need to use it
            count = distToCounterRecord1 + distBackToCounterRecord2;
          }
          else
          {
            // 2 cp between from and to
            count = distToCounterRecord1 + (counterRecord2-counterRecord1)
            + distBackToCounterRecord2;
          }
        }
      }
      cursor.close();
      // Step 3 : Now consolidates the result
      if (counterRecord1!=0)
      finally
      {
        if (counterRecord1 == counterRecord2)
        if (cursor != null)
        {
          // only one cp between from and to - no need to use it
          count = distToCounterRecord1 + distBackToCounterRecord2;
        }
        else
        {
          // 2 cp between from and to
          count = distToCounterRecord1 + (counterRecord2-counterRecord1)
            + distBackToCounterRecord2;
          cursor.close();
        }
      }
    }
    finally
    catch (DatabaseException e)
    {
      if (cursor != null)
        cursor.close();
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shutting down.
        }
      }
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    return count;
  }