mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
05.33.2011 246e4192d3967e638aad1f12adc3e36be2aa82e2
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -30,6 +30,8 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.getBytes;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
@@ -37,8 +39,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.DataFormatException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -144,12 +146,12 @@
    // Initialize counter
    this.counterCurrValue = 1;
    cursor = db.openCursor(txn, null);
    status = cursor.getLast(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    try
    {
      try
      status = cursor.getLast(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
        ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        {
          status = cursor.getPrev(key, data, LockMode.DEFAULT);
@@ -158,38 +160,17 @@
        else
        {
          // counter record
          counterCurrValue = decodeCounterValue(data.getData())+1;
          counterCurrValue = decodeCounterValue(data.getData()) + 1;
          counterTsLimit = cn.getTime();
          break;
        }
      }
      catch (UnsupportedEncodingException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        if (txn != null)
        {
          try
          {
            txn.abort();
          } catch (DatabaseException e1)
          {
            // can't do much more. The ReplicationServer is shuting down.
          }
        }
        replicationServer.shutdown();
      }
      catch (DataFormatException e)
      {
        // Should never happen
      }
      counterCurrValue += distBackToCounterRecord;
    }
    counterCurrValue += distBackToCounterRecord;
    cursor.close();
    finally
    {
      cursor.close();
    }
  }
  /**
@@ -377,55 +358,38 @@
    String str = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      dbCloseLock.readLock().lock();
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
    }
    catch (DatabaseException e1)
    {
      dbCloseLock.readLock().unlock();
      return null;
    }
    try
    {
      try
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
        /* database is empty */
        return null;
      }
      str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        // First record is a counter record .. go next
        status = cursor.getNext(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          // DB contains only a counter record
          return null;
        }
        try
        else
        {
          str = new String(key.getData(), "UTF-8");
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            // First record is a counter record .. go next
            status = cursor.getNext(key, data, LockMode.DEFAULT);
            if (status != OperationStatus.SUCCESS)
            {
              // DB contains only a counter record
              return null;
            }
            else
            {
              cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
            }
          }
        } catch (UnsupportedEncodingException e)
        {
          // never happens
          cn = new ChangeNumber(decodeUTF8(key.getData()));
        }
      }
      finally
      {
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
@@ -437,11 +401,18 @@
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
  /**
   * Read the last Change from the database.
   *
   * @return the last ChangeNumber.
   */
  public ChangeNumber readLastChange()
@@ -449,43 +420,36 @@
    Cursor cursor = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      dbCloseLock.readLock().lock();
      try
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getLast(key, data,
          LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        cursor = db.openCursor(null, null);
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          return null;
        }
        try
        {
          String str = new String(key.getData(), "UTF-8");
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              /* database only contain a counter record - don't know
               * how much it can be possible but ... */
              cn = null;
            }
          }
        }
        catch (UnsupportedEncodingException e)
        {
          // never happens
        }
        /* database is empty */
        return null;
      }
      finally
      String str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        closeLockedCursor(cursor);
        if (cursor.getPrev(key, data,
            LockMode.DEFAULT) != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
           */
          cn = null;
        }
      }
    }
    catch (DatabaseException e)
@@ -497,6 +461,11 @@
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -515,44 +484,56 @@
   */
  public class ReplServerDBCursor
  {
    private Cursor cursor = null;
    // The transaction that will protect the actions done with the cursor
     // The transaction that will protect the actions done with the cursor
    // Will be let null for a read cursor
    // Will be set non null for a write cursor
    private Transaction txn = null;
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    private final Transaction txn;
    private final Cursor cursor;
    private final DatabaseEntry key;
    private final DatabaseEntry data;
    private boolean isClosed = false;
    /**
     * Creates a ReplServerDBCursor that can be used for browsing a
     * replicationServer db.
     *
     * @param startingChangeNumber The ChangeNumber from which the cursor must
     *        start.
     * @throws Exception When the startingChangeNumber does not exist.
     * @param startingChangeNumber
     *          The ChangeNumber from which the cursor must start.
     * @throws Exception
     *           When the startingChangeNumber does not exist.
     */
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
            throws Exception
        throws Exception
    {
      if (startingChangeNumber != null)
      {
        key = new ReplicationKey(startingChangeNumber);
      }
      else
      {
        key = new DatabaseEntry();
      }
      data = new DatabaseEntry();
      txn = null;
      // Take the lock. From now on, whatever error that happen in the life
      // of this cursor should end by unlocking that lock. We must also
      // unlock it when throwing an exception.
      dbCloseLock.readLock().lock();
      Cursor localCursor = null;
      try
      {
        // Take the lock. From now on, whatever error that happen in the life
        // of this cursor should end by unlocking that lock. We must also
        // unlock it when throwing an exception.
        dbCloseLock.readLock().lock();
        cursor = db.openCursor(txn, null);
        localCursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        {
          key = new ReplicationKey(startingChangeNumber);
          data = new DatabaseEntry();
          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
          if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
            if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
@@ -564,51 +545,75 @@
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
              if (localCursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                closeLockedCursor(cursor);
                dbCloseLock.readLock().lock();
                cursor = db.openCursor(txn, null);
                localCursor.close();
                localCursor = db.openCursor(txn, null);
              }
            }
          }
        }
        cursor = localCursor;
      }
      catch (Exception e)
      {
       // Unlocking is required before throwing any exception
        closeLockedCursor(cursor);
        throw (e);
        // Unlocking is required before throwing any exception
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        throw e;
      }
    }
    private ReplServerDBCursor() throws DatabaseException
    private ReplServerDBCursor() throws Exception
    {
      key = new DatabaseEntry();
      data = new DatabaseEntry();
      // We'll go on only if no close or no clear is running
      dbCloseLock.readLock().lock();
      Transaction localTxn = null;
      Cursor localCursor = null;
      try
      {
        // We'll go on only if no close or no clear is running
        dbCloseLock.readLock().lock();
        // Create the transaction that will protect whatever done with this
        // write cursor.
        txn = dbenv.beginTransaction();
        localTxn = dbenv.beginTransaction();
        localCursor = db.openCursor(localTxn, null);
        cursor = db.openCursor(txn, null);
        txn = localTxn;
        cursor = localCursor;
      }
      catch(DatabaseException e)
      catch (Exception e)
      {
        if (txn != null)
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        if (localTxn != null)
        {
          try
          {
            txn.abort();
            localTxn.abort();
          }
          catch (DatabaseException dbe)
          {}
          catch (DatabaseException ignore)
          {
            // Ignore.
          }
        }
        closeLockedCursor(cursor);
        throw (e);
        throw e;
      }
    }
@@ -617,10 +622,20 @@
     */
    public void close()
    {
      synchronized (this)
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
        cursor = null;
      }
      catch (DatabaseException e)
      {
@@ -628,22 +643,29 @@
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.commit();
        } catch (DatabaseException e)
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          closeHasFailed = true;
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -655,14 +677,22 @@
     */
    public void abort()
    {
      if (cursor == null)
        return;
      synchronized (this)
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
        cursor = null;
      }
      catch (LockConflictException e1)
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
@@ -674,22 +704,29 @@
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e)
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          closeHasFailed = true;
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -706,15 +743,8 @@
      {
        return null;
      }
      try
      {
        String csnString = new String(key.getData(), "UTF-8");
        return new ChangeNumber(csnString);
      } catch (UnsupportedEncodingException e)
      {
        // can't happen
        return null;
      }
      String csnString = decodeUTF8(key.getData());
      return new ChangeNumber(csnString);
    }
    /**
@@ -738,26 +768,29 @@
        {
          return null;
        }
        try
        {
          ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
          if(ReplicationDB.isaCounter(cn))
          ChangeNumber cn = new ChangeNumber(
              decodeUTF8(key.getData()));
          if (ReplicationDB.isaCounter(cn))
          {
            // counter record
            continue;
          }
          currentChange = ReplicationData.generateChange(data.getData());
        } catch (Exception e) {
          currentChange = ReplicationData.generateChange(data
              .getData());
        }
        catch (Exception e)
        {
          /*
           * An error happening trying to convert the data from the
           * replicationServer database to an Update Message.
           * This can only happen if the database is corrupted.
           * There is not much more that we can do at this point except trying
           * to continue with the next record.
           * In such case, it is therefore possible that we miss some changes.
           * TODO. log an error message.
           * TODO : REPAIR : Such problem should be handled by the
           *        repair functionality.
           * replicationServer database to an Update Message. This can only
           * happen if the database is corrupted. There is not much more that we
           * can do at this point except trying to continue with the next
           * record. In such case, it is therefore possible that we miss some
           * changes. TODO. log an error message. TODO : REPAIR : Such problem
           * should be handled by the repair functionality.
           */
        }
      }
@@ -859,7 +892,7 @@
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        String csnString = new String(key.getData(), "UTF-8");
        String csnString = decodeUTF8(key.getData());
        cn = new ChangeNumber(csnString);
        if (cn.getServerId() != 0)
        {
@@ -900,7 +933,7 @@
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
        cn = new ChangeNumber(decodeUTF8(key.getData()));
      }
      else
      {
@@ -915,7 +948,7 @@
      }
      while (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
        cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        {
          // regular change record
@@ -952,18 +985,6 @@
        }
      }
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (DataFormatException e)
    {
      // Should never happen
    }
    finally
    {
      if (cursor != null)
@@ -975,7 +996,7 @@
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
          // can't do much more. The ReplicationServer is shutting down.
        }
      }
    }
@@ -996,33 +1017,22 @@
   * Decode the provided database entry as a the value of a counter.
   * @param entry The provided entry.
   * @return The counter value.
   * @throws DataFormatException
   */
  private static int decodeCounterValue(byte[] entry)
  throws DataFormatException
  {
    try
    {
      String numAckStr = new String(entry, 0, entry.length, "UTF-8");
      return Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    String numAckStr = decodeUTF8(entry);
    return Integer.parseInt(numAckStr);
  }
  /**
   * Encode the provided counter value in a database entry.
   * @param entry The provided entry.
   * @return The databse entry with the counter value encoded inside..
   * @throws UnsupportedEncodingException
   * @return The database entry with the counter value encoded inside.
   */
  static private DatabaseEntry encodeCounterValue(int value)
  throws UnsupportedEncodingException
  {
    DatabaseEntry entry = new DatabaseEntry();
    entry.setData(String.valueOf(value).getBytes("UTF-8"));
    entry.setData(getBytes(String.valueOf(value)));
    return entry;
  }