mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
05.33.2011 6b1e3bf06de1327d05b8cbefcd930e5974f556d3
OpenDJ-107: Potential for leaking DB cursors in replication databases
11 files modified
943 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDB.java 219 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 82 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 422 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 13 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 27 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/util/StaticUtils.java 64 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java 84 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -478,8 +478,7 @@
        /* the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         */
        ReplServerDBCursor cursor;
        cursor = db.openDeleteCursor();
        ReplServerDBCursor cursor = db.openDeleteCursor();
        try
        {
@@ -517,7 +516,7 @@
            throw (e);
          }
        }
        catch (DatabaseException e)
        catch (Exception e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -29,6 +29,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
@@ -264,14 +265,7 @@
          /* database is empty */
          return 0;
        }
        try
        {
          str = new String(key.getData(), "UTF-8");
        } catch (UnsupportedEncodingException e)
        {
          // never happens, return anyway
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
      }
@@ -341,14 +335,7 @@
          /* database is empty */
          return 0;
        }
        try
        {
          str = new String(key.getData(), "UTF-8");
        } catch (UnsupportedEncodingException e)
        {
          // never happens, returns anyway
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
      }
@@ -387,47 +374,57 @@
   */
  public class DraftCNDBCursor
  {
    private Cursor cursor = null;
    private final Cursor cursor;
    // The transaction that will protect the actions done with the cursor
    // Will be let null for a read cursor
    // Will be set non null for a write cursor
    private Transaction txn = null;
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry entry = new DatabaseEntry();
    private final Transaction txn;
    private final DatabaseEntry key;
    private final DatabaseEntry entry;
    private boolean isClosed = false;
    /**
     * Creates a cursor that can be used for browsing the db.
     *
     * @param startingDraftCN the draftCN from which the cursor must
     *                        start.
     * @throws Exception      when the startingDraftCN does not exist.
     * @param startingDraftCN
     *          the draftCN from which the cursor must start.
     * @throws Exception
     *           when the startingDraftCN does not exist.
     */
    private DraftCNDBCursor(int startingDraftCN) throws Exception
    {
      // For consistency with other constructor, we'll use a local here,
      // even though it's always null.
      final Transaction localTxn = null;
      Cursor localCursor = null;
      this.key = new ReplicationDraftCNKey(startingDraftCN);
      this.entry = new DatabaseEntry();
      // Take the lock. From now on, whatever error that happen in the life
      // of this cursor should end by unlocking that lock. We must also
      // unlock it when throwing an exception.
      dbCloseLock.readLock().lock();
      try
      {
        // Take the lock. From now on, whatever error that happen in the life
        // of this cursor should end by unlocking that lock. We must also
        // unlock it when throwing an exception.
        dbCloseLock.readLock().lock();
        cursor = db.openCursor(txn, null);
        localCursor = db.openCursor(localTxn, null);
        if (startingDraftCN >= 0)
        {
          key = new ReplicationDraftCNKey(startingDraftCN);
          entry = new DatabaseEntry();
          if (cursor.getSearchKey(key, entry, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          if (localCursor.getSearchKey(
              key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (cursor.getSearchKeyRange(key, entry, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            if (localCursor.getSearchKeyRange(key, entry,
                LockMode.DEFAULT) != OperationStatus.SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              throw new Exception("ChangeLog Draft Change Number " +
                  startingDraftCN + " is not available");
              throw new Exception("ChangeLog Draft Change Number "
                  + startingDraftCN + " is not available");
            }
            else
            {
@@ -435,57 +432,76 @@
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              if (localCursor.getPrev(
                  key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS)
              {
                closeLockedCursor(cursor);
                dbCloseLock.readLock().lock();
                cursor = db.openCursor(txn, null);
                localCursor.close();
                localCursor = db.openCursor(localTxn, null);
              }
            }
          }
          else
          {
            // success : key has the right value
          }
        }
        this.txn = localTxn;
        this.cursor = localCursor;
      }
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        closeLockedCursor(cursor);
        throw (e);
        closeLockedCursor(localCursor);
        throw e;
      }
    }
    private DraftCNDBCursor() throws DatabaseException
    private DraftCNDBCursor() throws Exception
    {
      Transaction localTxn = null;
      Cursor localCursor = null;
      this.key = new DatabaseEntry();
      this.entry = new DatabaseEntry();
      // We'll go on only if no close or no clear is running
      dbCloseLock.readLock().lock();
      try
      {
        // We'll go on only if no close or no clear is running
        dbCloseLock.readLock().lock();
        // Create the transaction that will protect whatever done with this
        // write cursor.
        txn = dbenv.beginTransaction();
        localTxn = dbenv.beginTransaction();
        localCursor = db.openCursor(localTxn, null);
        cursor = db.openCursor(txn, null);
        this.txn = localTxn;
        this.cursor = localCursor;
      }
      catch(DatabaseException e)
      catch (Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        if (txn != null)
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (DatabaseException ignored)
        {
          // Ignore.
          TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
        }
        if (localTxn != null)
        {
          try
          {
            txn.abort();
            localTxn.abort();
          }
          catch (DatabaseException dbe)
          {}
          catch (DatabaseException ignored)
          {
            // Ignore.
            TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
          }
        }
        closeLockedCursor(cursor);
        throw (e);
        throw e;
      }
    }
@@ -494,33 +510,50 @@
     */
    public void close()
    {
      synchronized (this)
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
        cursor = null;
      }
      catch (DatabaseException e)
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.commit();
        } catch (DatabaseException e)
        }
        catch (Exception e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          closeHasFailed = true;
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -532,49 +565,63 @@
     */
    public void abort()
    {
      if (cursor == null)
        return;
      synchronized (this)
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
        cursor = null;
      }
      catch (LockConflictException e1)
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (DatabaseException e)
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e)
        }
        catch (Exception e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          closeHasFailed = true;
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
     * Getter for the value field of the current cursor.
     * @return The current value field.
     * @throws DatabaseException When an error happens.
     */
    public String currentValue() throws DatabaseException
    public String currentValue()
    {
      try
      {
@@ -598,9 +645,8 @@
    /**
     * Getter for the serviceID field of the current cursor.
     * @return The current serviceID.
     * @throws DatabaseException When an error happens.
     */
    public String currentServiceID() throws DatabaseException
    public String currentServiceID()
    {
      try
      {
@@ -616,7 +662,7 @@
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      return null;
    }
@@ -624,9 +670,8 @@
    /**
     * Returns the replication changeNumber associated with the current key.
     * @return the replication changeNumber
     * @throws DatabaseException when a problem occurs.
     */
    public ChangeNumber currentChangeNumber() throws DatabaseException
    public ChangeNumber currentChangeNumber()
    {
      try
      {
@@ -672,6 +717,16 @@
    {
      cursor.delete();
    }
    /**
     * Returns the current key associated with this cursor.
     *
     * @return The current key associated with this cursor.
     */
    public DatabaseEntry getKey()
    {
      return key;
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -417,16 +417,16 @@
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          shutdown = true;
          throw (e);
          throw e;
        }
      }
      catch (DatabaseException e)
      catch (Exception e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        throw (e);
        throw e;
      }
    }
  }
opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -124,7 +125,7 @@
   */
  public int getDraftCN()
  {
    ReplicationDraftCNKey sk = (ReplicationDraftCNKey)this.draftCNDbCursor.key;
    ReplicationDraftCNKey sk = (ReplicationDraftCNKey) draftCNDbCursor.getKey();
    int currentSeqnum = sk.getDraftCN();
    return currentSeqnum;
  }
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -350,46 +350,61 @@
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (int serverId : replicationServerDomain.getServers())
          try
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ReplicationIterator iterator =
              replicationServerDomain.getChangelogIterator(serverId, lastCsn);
            if (iterator != null)
            /* fill the lateQueue */
            for (int serverId : replicationServerDomain.getServers())
            {
              if (iterator.getChange() != null)
              ChangeNumber lastCsn = serverState
                  .getMaxChangeNumber(serverId);
              ReplicationIterator iterator = replicationServerDomain
                  .getChangelogIterator(serverId, lastCsn);
              if (iterator != null)
              {
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            // The loop below relies on the fact that it is sorted based
            // on the currentChange of each iterator to consider the next
            // change across all servers.
            //
            // Hence it is necessary to remove and eventual add again an
            // iterator when looping in order to keep consistent the order of
            // the iterators (see ReplicationIteratorComparator.
            while (!iteratorSortedSet.isEmpty()
                && (lateQueue.count() < 100)
                && (lateQueue.bytesCount() < 50000))
            {
              ReplicationIterator iterator = iteratorSortedSet
                  .first();
              iteratorSortedSet.remove(iterator);
              lateQueue.add(iterator.getChange());
              if (iterator.next())
              {
                iteratorSortedSet.add(iterator);
              } else
              }
              else
              {
                iterator.releaseCursor();
              }
            }
          }
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change across all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() &&
              (lateQueue.count()<100) &&
              (lateQueue.bytesCount()<50000) )
          finally
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
            lateQueue.add(iterator.getChange());
            if (iterator.next())
              iteratorSortedSet.add(iterator);
            else
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
          }
          for (ReplicationIterator iterator : iteratorSortedSet)
          {
            iterator.releaseCursor();
          }
          /*
           * If the late queue is empty then we could not find any
           * messages in the replication log so the remote serevr is not
@@ -527,9 +542,16 @@
              // if that iterator has changes, then it is a candidate
              // it is added in the sorted list at a position given by its
              // current change (see ReplicationIteratorComparator).
              if ((iterator != null) && (iterator.getChange() != null))
              if (iterator != null)
              {
                iteratorSortedSet.add(iterator);
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            UpdateMsg msg = iteratorSortedSet.first().getChange();
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -30,6 +30,8 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.getBytes;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
@@ -37,8 +39,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.DataFormatException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -144,12 +146,12 @@
    // Initialize counter
    this.counterCurrValue = 1;
    cursor = db.openCursor(txn, null);
    status = cursor.getLast(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    try
    {
      try
      status = cursor.getLast(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
        ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        {
          status = cursor.getPrev(key, data, LockMode.DEFAULT);
@@ -158,38 +160,17 @@
        else
        {
          // counter record
          counterCurrValue = decodeCounterValue(data.getData())+1;
          counterCurrValue = decodeCounterValue(data.getData()) + 1;
          counterTsLimit = cn.getTime();
          break;
        }
      }
      catch (UnsupportedEncodingException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        if (txn != null)
        {
          try
          {
            txn.abort();
          } catch (DatabaseException e1)
          {
            // can't do much more. The ReplicationServer is shuting down.
          }
        }
        replicationServer.shutdown();
      }
      catch (DataFormatException e)
      {
        // Should never happen
      }
      counterCurrValue += distBackToCounterRecord;
    }
    counterCurrValue += distBackToCounterRecord;
    cursor.close();
    finally
    {
      cursor.close();
    }
  }
  /**
@@ -377,55 +358,38 @@
    String str = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      dbCloseLock.readLock().lock();
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
    }
    catch (DatabaseException e1)
    {
      dbCloseLock.readLock().unlock();
      return null;
    }
    try
    {
      try
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
        /* database is empty */
        return null;
      }
      str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        // First record is a counter record .. go next
        status = cursor.getNext(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          // DB contains only a counter record
          return null;
        }
        try
        else
        {
          str = new String(key.getData(), "UTF-8");
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            // First record is a counter record .. go next
            status = cursor.getNext(key, data, LockMode.DEFAULT);
            if (status != OperationStatus.SUCCESS)
            {
              // DB contains only a counter record
              return null;
            }
            else
            {
              cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
            }
          }
        } catch (UnsupportedEncodingException e)
        {
          // never happens
          cn = new ChangeNumber(decodeUTF8(key.getData()));
        }
      }
      finally
      {
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
@@ -437,11 +401,18 @@
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
  /**
   * Read the last Change from the database.
   *
   * @return the last ChangeNumber.
   */
  public ChangeNumber readLastChange()
@@ -449,43 +420,36 @@
    Cursor cursor = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      dbCloseLock.readLock().lock();
      try
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getLast(key, data,
          LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        cursor = db.openCursor(null, null);
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          return null;
        }
        try
        {
          String str = new String(key.getData(), "UTF-8");
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              /* database only contain a counter record - don't know
               * how much it can be possible but ... */
              cn = null;
            }
          }
        }
        catch (UnsupportedEncodingException e)
        {
          // never happens
        }
        /* database is empty */
        return null;
      }
      finally
      String str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        closeLockedCursor(cursor);
        if (cursor.getPrev(key, data,
            LockMode.DEFAULT) != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
           */
          cn = null;
        }
      }
    }
    catch (DatabaseException e)
@@ -497,6 +461,11 @@
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -515,44 +484,56 @@
   */
  public class ReplServerDBCursor
  {
    private Cursor cursor = null;
    // The transaction that will protect the actions done with the cursor
     // The transaction that will protect the actions done with the cursor
    // Will be let null for a read cursor
    // Will be set non null for a write cursor
    private Transaction txn = null;
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    private final Transaction txn;
    private final Cursor cursor;
    private final DatabaseEntry key;
    private final DatabaseEntry data;
    private boolean isClosed = false;
    /**
     * Creates a ReplServerDBCursor that can be used for browsing a
     * replicationServer db.
     *
     * @param startingChangeNumber The ChangeNumber from which the cursor must
     *        start.
     * @throws Exception When the startingChangeNumber does not exist.
     * @param startingChangeNumber
     *          The ChangeNumber from which the cursor must start.
     * @throws Exception
     *           When the startingChangeNumber does not exist.
     */
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
            throws Exception
        throws Exception
    {
      if (startingChangeNumber != null)
      {
        key = new ReplicationKey(startingChangeNumber);
      }
      else
      {
        key = new DatabaseEntry();
      }
      data = new DatabaseEntry();
      txn = null;
      // Take the lock. From now on, whatever error that happen in the life
      // of this cursor should end by unlocking that lock. We must also
      // unlock it when throwing an exception.
      dbCloseLock.readLock().lock();
      Cursor localCursor = null;
      try
      {
        // Take the lock. From now on, whatever error that happen in the life
        // of this cursor should end by unlocking that lock. We must also
        // unlock it when throwing an exception.
        dbCloseLock.readLock().lock();
        cursor = db.openCursor(txn, null);
        localCursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        {
          key = new ReplicationKey(startingChangeNumber);
          data = new DatabaseEntry();
          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
          if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
            if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
@@ -564,51 +545,75 @@
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
              if (localCursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                closeLockedCursor(cursor);
                dbCloseLock.readLock().lock();
                cursor = db.openCursor(txn, null);
                localCursor.close();
                localCursor = db.openCursor(txn, null);
              }
            }
          }
        }
        cursor = localCursor;
      }
      catch (Exception e)
      {
       // Unlocking is required before throwing any exception
        closeLockedCursor(cursor);
        throw (e);
        // Unlocking is required before throwing any exception
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        throw e;
      }
    }
    private ReplServerDBCursor() throws DatabaseException
    private ReplServerDBCursor() throws Exception
    {
      key = new DatabaseEntry();
      data = new DatabaseEntry();
      // We'll go on only if no close or no clear is running
      dbCloseLock.readLock().lock();
      Transaction localTxn = null;
      Cursor localCursor = null;
      try
      {
        // We'll go on only if no close or no clear is running
        dbCloseLock.readLock().lock();
        // Create the transaction that will protect whatever done with this
        // write cursor.
        txn = dbenv.beginTransaction();
        localTxn = dbenv.beginTransaction();
        localCursor = db.openCursor(localTxn, null);
        cursor = db.openCursor(txn, null);
        txn = localTxn;
        cursor = localCursor;
      }
      catch(DatabaseException e)
      catch (Exception e)
      {
        if (txn != null)
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        if (localTxn != null)
        {
          try
          {
            txn.abort();
            localTxn.abort();
          }
          catch (DatabaseException dbe)
          {}
          catch (DatabaseException ignore)
          {
            // Ignore.
          }
        }
        closeLockedCursor(cursor);
        throw (e);
        throw e;
      }
    }
@@ -617,10 +622,20 @@
     */
    public void close()
    {
      synchronized (this)
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
        cursor = null;
      }
      catch (DatabaseException e)
      {
@@ -628,22 +643,29 @@
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.commit();
        } catch (DatabaseException e)
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          closeHasFailed = true;
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -655,14 +677,22 @@
     */
    public void abort()
    {
      if (cursor == null)
        return;
      synchronized (this)
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
        cursor = null;
      }
      catch (LockConflictException e1)
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
@@ -674,22 +704,29 @@
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e)
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          closeHasFailed = true;
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -706,15 +743,8 @@
      {
        return null;
      }
      try
      {
        String csnString = new String(key.getData(), "UTF-8");
        return new ChangeNumber(csnString);
      } catch (UnsupportedEncodingException e)
      {
        // can't happen
        return null;
      }
      String csnString = decodeUTF8(key.getData());
      return new ChangeNumber(csnString);
    }
    /**
@@ -738,26 +768,29 @@
        {
          return null;
        }
        try
        {
          ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
          if(ReplicationDB.isaCounter(cn))
          ChangeNumber cn = new ChangeNumber(
              decodeUTF8(key.getData()));
          if (ReplicationDB.isaCounter(cn))
          {
            // counter record
            continue;
          }
          currentChange = ReplicationData.generateChange(data.getData());
        } catch (Exception e) {
          currentChange = ReplicationData.generateChange(data
              .getData());
        }
        catch (Exception e)
        {
          /*
           * An error happening trying to convert the data from the
           * replicationServer database to an Update Message.
           * This can only happen if the database is corrupted.
           * There is not much more that we can do at this point except trying
           * to continue with the next record.
           * In such case, it is therefore possible that we miss some changes.
           * TODO. log an error message.
           * TODO : REPAIR : Such problem should be handled by the
           *        repair functionality.
           * replicationServer database to an Update Message. This can only
           * happen if the database is corrupted. There is not much more that we
           * can do at this point except trying to continue with the next
           * record. In such case, it is therefore possible that we miss some
           * changes. TODO. log an error message. TODO : REPAIR : Such problem
           * should be handled by the repair functionality.
           */
        }
      }
@@ -859,7 +892,7 @@
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        String csnString = new String(key.getData(), "UTF-8");
        String csnString = decodeUTF8(key.getData());
        cn = new ChangeNumber(csnString);
        if (cn.getServerId() != 0)
        {
@@ -900,7 +933,7 @@
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
        cn = new ChangeNumber(decodeUTF8(key.getData()));
      }
      else
      {
@@ -915,7 +948,7 @@
      }
      while (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
        cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        {
          // regular change record
@@ -952,18 +985,6 @@
        }
      }
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (DataFormatException e)
    {
      // Should never happen
    }
    finally
    {
      if (cursor != null)
@@ -975,7 +996,7 @@
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
          // can't do much more. The ReplicationServer is shutting down.
        }
      }
    }
@@ -996,33 +1017,22 @@
   * Decode the provided database entry as a the value of a counter.
   * @param entry The provided entry.
   * @return The counter value.
   * @throws DataFormatException
   */
  private static int decodeCounterValue(byte[] entry)
  throws DataFormatException
  {
    try
    {
      String numAckStr = new String(entry, 0, entry.length, "UTF-8");
      return Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    String numAckStr = decodeUTF8(entry);
    return Integer.parseInt(numAckStr);
  }
  /**
   * Encode the provided counter value in a database entry.
   * @param entry The provided entry.
   * @return The databse entry with the counter value encoded inside..
   * @throws UnsupportedEncodingException
   * @return The database entry with the counter value encoded inside.
   */
  static private DatabaseEntry encodeCounterValue(int value)
  throws UnsupportedEncodingException
  {
    DatabaseEntry entry = new DatabaseEntry();
    entry.setData(String.valueOf(value).getBytes("UTF-8"));
    entry.setData(getBytes(String.valueOf(value)));
    return entry;
  }
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -32,6 +32,8 @@
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.DebugLogLevel;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -131,9 +133,9 @@
   */
  private void start() throws DatabaseException, ReplicationDBException
  {
    Cursor cursor = stateDb.openCursor(null, null);
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    Cursor cursor = stateDb.openCursor(null, null);
    try
    {
@@ -259,7 +261,14 @@
    }
    finally
    {
      cursor.close();
      try
      {
        cursor.close();
      }
      catch (Exception ignored)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
      }
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1452,33 +1452,6 @@
  }
  /**
   * Creates and returns an iterator.
   * When the iterator is not used anymore, the caller MUST call the
   * ReplicationIterator.releaseCursor() method to free the resources
   * and locks used by the ReplicationIterator.
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getIterator(int serverId,
    ChangeNumber changeNumber)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
      return null;
    try
    {
      ReplicationIterator it = handler.generateIterator(changeNumber);
      return it;
    } catch (Exception e)
    {
      return null;
    }
  }
  /**
   * Returns the change count for that ReplicationServerDomain.
   *
   * @return the change count.
opends/src/server/org/opends/server/util/StaticUtils.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.util;
@@ -31,13 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -170,6 +165,61 @@
  /**
   * Returns the provided byte array decoded as a UTF-8 string without throwing
   * an UnsupportedEncodingException. This method is equivalent to:
   *
   * <pre>
   * try
   * {
   *   return new String(bytes, &quot;UTF-8&quot;);
   * }
   * catch (UnsupportedEncodingException e)
   * {
   *   // Should never happen: UTF-8 is always supported.
   *   throw new RuntimeException(e);
   * }
   * </pre>
   *
   * @param bytes
   *          The byte array to be decoded as a UTF-8 string.
   * @return The decoded string.
   */
  public static String decodeUTF8(final byte[] bytes)
  {
    Validator.ensureNotNull(bytes);
    if (bytes.length == 0)
    {
      return "".intern();
    }
    final StringBuilder builder = new StringBuilder(bytes.length);
    final int sz = bytes.length;
    for (int i = 0; i < sz; i++)
    {
      final byte b = bytes[i];
      if ((b & 0x7f) != b)
      {
        try
        {
          builder.append(new String(bytes, i, (sz - i), "UTF-8"));
        }
        catch (UnsupportedEncodingException e)
        {
          // Should never happen: UTF-8 is always supported.
          throw new RuntimeException(e);
        }
        break;
      }
      builder.append((char) b);
    }
    return builder.toString();
  }
  /**
   * Construct a byte array containing the UTF-8 encoding of the
   * provided <code>char</code> array.
   *
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -126,26 +127,31 @@
      assertEquals(handler.getLastKey(), sn3);
      DraftCNDBCursor dbc = handler.getReadCursor(firstkey);
      assertEquals(dbc.currentChangeNumber(), changeNumber1);
      assertEquals(dbc.currentServiceID(), serviceID1);
      assertEquals(dbc.currentValue(), value1);
      assertTrue(dbc.toString().length() != 0);
      try
      {
        assertEquals(dbc.currentChangeNumber(), changeNumber1);
        assertEquals(dbc.currentServiceID(), serviceID1);
        assertEquals(dbc.currentValue(), value1);
        assertTrue(dbc.toString().length() != 0);
      assertTrue(dbc.next());
        assertTrue(dbc.next());
      assertEquals(dbc.currentChangeNumber(), changeNumber2);
      assertEquals(dbc.currentServiceID(), serviceID2);
      assertEquals(dbc.currentValue(), value2);
        assertEquals(dbc.currentChangeNumber(), changeNumber2);
        assertEquals(dbc.currentServiceID(), serviceID2);
        assertEquals(dbc.currentValue(), value2);
      assertTrue(dbc.next());
        assertTrue(dbc.next());
      assertEquals(dbc.currentChangeNumber(), changeNumber3);
      assertEquals(dbc.currentServiceID(), serviceID3);
      assertEquals(dbc.currentValue(), value3);
        assertEquals(dbc.currentChangeNumber(), changeNumber3);
        assertEquals(dbc.currentServiceID(), serviceID3);
        assertEquals(dbc.currentValue(), value3);
      assertFalse(dbc.next());
      handler.releaseReadCursor(dbc);
        assertFalse(dbc.next());
      }
      finally
      {
        handler.releaseReadCursor(dbc);
      }
      handler.setPurgeDelay(100);
@@ -258,25 +264,43 @@
      assertEquals(handler.getValue(sn3),value3);
      DraftCNDbIterator it = handler.generateIterator(sn1);
      assertEquals(it.getDraftCN(),sn1);
      assertTrue(it.next());
      assertEquals(it.getDraftCN(),sn2);
      assertTrue(it.next());
      assertEquals(it.getDraftCN(),sn3);
      assertFalse(it.next());
      it.releaseCursor();
      try
      {
        assertEquals(it.getDraftCN(), sn1);
        assertTrue(it.next());
        assertEquals(it.getDraftCN(), sn2);
        assertTrue(it.next());
        assertEquals(it.getDraftCN(), sn3);
        assertFalse(it.next());
      }
      finally
      {
        it.releaseCursor();
      }
      it = handler.generateIterator(sn2);
      assertEquals(it.getDraftCN(),sn2);
      assertTrue(it.next());
      assertEquals(it.getDraftCN(),sn3);
      assertFalse(it.next());
      it.releaseCursor();
      try
      {
        assertEquals(it.getDraftCN(), sn2);
        assertTrue(it.next());
        assertEquals(it.getDraftCN(), sn3);
        assertFalse(it.next());
      }
      finally
      {
        it.releaseCursor();
      }
      it = handler.generateIterator(sn3);
      assertEquals(it.getDraftCN(),sn3);
      assertFalse(it.next());
      it.releaseCursor();
      try
      {
        assertEquals(it.getDraftCN(), sn3);
        assertFalse(it.next());
      }
      finally
      {
        it.releaseCursor();
      }
      // Clear ...
      handler.clear();
opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.util;
@@ -138,6 +139,21 @@
  }
  /**
   * Tests the {@link StaticUtils#decodeUTF8(byte[])} method.
   *
   * @param inputString
   *          The input string.
   * @throws Exception
   *           If the test failed unexpectedly.
   */
  @Test(dataProvider = "getBytesTestData")
  public void testDecodeUTF8(String inputString) throws Exception
  {
    final byte[] bytes = inputString.getBytes("UTF-8");
    Assert.assertEquals(StaticUtils.decodeUTF8(bytes), inputString);
  }
  /**
   * Tests the {@link StaticUtils#getBytes(String)} method.
   *
   * @param inputString
@@ -1174,7 +1190,7 @@
   *           If the test failed unexpectedly.
   */
  @Test(dataProvider = "listsAreEqualTestData")
  public void testListsAreEqual(List list1, List list2, boolean result)
  public void testListsAreEqual(List<?> list1, List<?> list2, boolean result)
      throws Exception {
    Assert.assertEquals(StaticUtils.listsAreEqual(list1, list2), result);
  }