mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.33.2011 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d
Fix OPENDJ-184: Transient errors when accessing cn=changelog DraftCN DB result in complete shutdown of the replication service

See issue for more info.
8 files modified
1262 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 120 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDB.java 211 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNData.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 146 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 675 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationData.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 50 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 23 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,7 +51,6 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
/**
 * This class is used for managing the replicationServer database for each
@@ -111,9 +110,6 @@
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  private long latestTrimDate = 0;
  /**
@@ -122,7 +118,7 @@
   * are older than this age are removed.
   *
   */
  private long trimage;
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -143,7 +139,7 @@
    this.replicationServer = replicationServer;
    serverId = id;
    this.baseDn = baseDn;
    trimage = replicationServer.getTrimage();
    trimAge = replicationServer.getTrimAge();
    queueMaxSize = queueSize;
    queueLowmark = queueSize * 1 / 5;
    queueHimark = queueSize * 4 / 5;
@@ -291,43 +287,6 @@
  }
  /**
   * Return the number of changes between 2 provided change numbers.
   * @param from The lower (older) change number.
   * @param to   The upper (newer) change number.
   * @return The computed number of changes.
   */
  public int traverseAndCount(ChangeNumber from, ChangeNumber to)
  {
    int count = 0;
    flush();
    ReplServerDBCursor cursor = null;
    try
    {
      try
      {
        cursor = db.openReadCursor(from);
      }
      catch(Exception e)
      {
        return 0;
      }
      ChangeNumber curr = null;
      while ((curr = cursor.nextChangeNumber())!=null)
      {
        if (curr.newerOrEquals(to))
          break;
        count++;
      }
    }
    finally
    {
      if (cursor != null)
        cursor.abort();
    }
    return count;
  }
  /**
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
@@ -456,80 +415,67 @@
   */
  private void trim() throws DatabaseException, Exception
  {
    if (trimage == 0)
    if (trimAge == 0)
    {
      return;
    int size = 0;
    boolean finished = false;
    boolean done = false;
    }
    latestTrimDate = TimeThread.getTime() - trimage;
    latestTrimDate = TimeThread.getTime() - trimAge;
    ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
    // Find the last changeNumber before the trimDate, in the Database.
    ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
    ChangeNumber lastBeforeTrimDate = db
        .getPreviousChangeNumber(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    for (int i = 0; i < 100; i++)
    {
      synchronized (flushLock)
      {
        /* the trim is done by group in order to save some CPU and IO bandwidth
        /*
         * the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         */
        ReplServerDBCursor cursor = db.openDeleteCursor();
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
        {
          while ((size < 5000 ) &&  (!finished))
          for (int j = 0; j < 50; j++)
          {
            ChangeNumber changeNumber = cursor.nextChangeNumber();
            if (changeNumber != null)
            if (changeNumber == null)
            {
              if ((!changeNumber.equals(lastChange))
                  && (changeNumber.older(trimDate)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstChange = changeNumber;
                finished = true;
              }
              cursor.close();
              done = true;
              return;
            }
            if ((!changeNumber.equals(lastChange))
                && (changeNumber.older(trimDate)))
            {
              cursor.delete();
            }
            else
              finished = true;
            {
              firstChange = changeNumber;
              cursor.close();
              done = true;
              return;
            }
          }
          cursor.close();
          done = true;
        }
        catch (LockConflictException e)
        {
          cursor.abort();
          if (tries == DEADLOCK_RETRIES)
          {
            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
            // shutdown the ReplicationServer.
            shutdown = true;
            throw (e);
          }
        }
        catch (Exception e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          shutdown = true;
          cursor.abort();
          throw (e);
          shutdown = true;
          throw e;
        }
      }
    }
@@ -644,7 +590,7 @@
   */
  public void setPurgeDelay(long delay)
  {
    trimage = delay;
    trimAge = delay;
  }
  /**
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -32,7 +32,6 @@
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.MessageBuilder;
@@ -40,14 +39,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
@@ -61,9 +53,6 @@
  private ReplicationDbEnv dbenv = null;
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
@@ -103,71 +92,41 @@
  public void addEntry(int draftCN, String value, String domainBaseDN,
      ChangeNumber changeNumber)
  {
    Transaction txn = null;
    try
    {
      int tries = 0;
      boolean done = false;
      DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
      DatabaseEntry data = new DraftCNData(draftCN,
          value, domainBaseDN, changeNumber);
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      // Use a transaction so that we can override durability.
      Transaction txn = null;
      dbCloseLock.readLock().lock();
      try
      {
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
          DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
          DatabaseEntry data = new DraftCNData(draftCN,
              value, domainBaseDN, changeNumber);
          db.put(txn, key, data);
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (LockConflictException e)
        {
          // Try again.
        }
        finally
        {
          if (txn != null)
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
          }
          dbCloseLock.readLock().unlock();
        }
        txn = dbenv.beginTransaction();
        db.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
      if (!done)
      finally
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        replicationServer.shutdown();
        if (txn != null)
        {
          // No effect if txn has committed.
          try
          {
            txn.abort();
          }
          catch (Exception e)
          {
            // Ignored.
          }
        }
        dbCloseLock.readLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
    }
  }
@@ -229,12 +188,20 @@
  }
  private void closeLockedCursor(Cursor cursor)
  throws DatabaseException
  {
    try
    {
      if (cursor != null)
        cursor.close();
      {
        try
        {
          cursor.close();
        }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
    }
    finally
    {
@@ -248,12 +215,10 @@
   */
  public int readFirstDraftCN()
  {
    Cursor cursor = null;
    String str = null;
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      {
        cursor = db.openCursor(null, null);
@@ -265,9 +230,8 @@
          /* database is empty */
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
        return new Integer(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -277,20 +241,7 @@
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return 0;
    }
    catch (Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -318,12 +269,10 @@
   */
  public int readLastDraftCN()
  {
    Cursor cursor = null;
    String str = null;
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      {
        cursor = db.openCursor(null, null);
@@ -335,9 +284,8 @@
          /* database is empty */
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
        return new Integer(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -346,16 +294,7 @@
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return 0;
    }
    catch (Exception e)
    {
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -519,20 +458,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
@@ -540,20 +466,11 @@
        {
          txn.commit();
        }
        catch (Exception e)
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -574,26 +491,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
@@ -601,20 +499,11 @@
        {
          txn.abort();
        }
        catch (Exception e)
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,10 +23,12 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2010 ForgeRock AS.
 *      Portions Copyright 2010-2011 ForgeRock AS.
 */
package org.opends.server.replication.server;
import static org.opends.server.util.StaticUtils.getBytes;
import java.io.UnsupportedEncodingException;
import org.opends.messages.Message;
@@ -53,29 +55,14 @@
   * @param value The value (cookie).
   * @param serviceID The serviceID (domain DN).
   * @param changeNumber The replication change number.
   *
   * @throws UnsupportedEncodingException When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public DraftCNData(int draftCN, String value,
      String serviceID, ChangeNumber changeNumber)
  throws UnsupportedEncodingException
  {
    String record = value
                   + FIELD_SEPARATOR + serviceID
                   + FIELD_SEPARATOR + changeNumber;
    byte[] byteValue;
    try
    {
      byteValue = record.getBytes("UTF-8");
      this.setData(byteValue);
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
      return;
    }
    setData(getBytes(record));
  }
  /**
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,7 +50,6 @@
import org.opends.server.types.InitializationException;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
/**
 * This class is used for managing the replicationServer database for each
@@ -84,16 +83,13 @@
  private DirectoryThread thread = null;
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimage;
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -108,7 +104,7 @@
         throws DatabaseException
  {
    this.replicationServer = replicationServer;
    this.trimage = replicationServer.getTrimage();
    this.trimAge = replicationServer.getTrimAge();
    // DB initialization
    db = new DraftCNDB(replicationServer, dbenv);
@@ -312,7 +308,7 @@
   */
  public void trim() throws DatabaseException, Exception
  {
    if (trimage == 0)
    if (trimAge == 0)
      return;
    clear(null);
@@ -330,106 +326,86 @@
   *
   */
  public void clear(String serviceIDToClear)
  throws DatabaseException, Exception
      throws DatabaseException, Exception
  {
    if (this.count()==0)
    if (this.count() == 0)
    {
      return;
    }
    int size = 0;
    int tries = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber crossDomainEligibleCN = replicationServer
        .getEligibleCN();
    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    for (int i = 0; i < 100; i++)
    {
      DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        while ((size < 5000 ) &&  (!finished))
        for (int j = 0; j < 50; j++)
        {
          // let's traverse the DraftCNDb
          if (!cursor.next())
          {
            finished=true;
            cursor.close();
            return;
          }
          else
          ChangeNumber cn = cursor.currentChangeNumber();
          // From the draftCNDb change record, get the domain and changeNumber
          String serviceID = cursor.currentServiceID();
          if ((serviceIDToClear != null)
              && (serviceIDToClear.equalsIgnoreCase(serviceID)))
          {
            ChangeNumber cn = cursor.currentChangeNumber();
            // From the draftCNDb change record, get the domain and changeNumber
            String serviceID = cursor.currentServiceID();
            if ((serviceIDToClear!=null) &&
                (serviceIDToClear.equalsIgnoreCase(serviceID)))
            {
              size++;
              cursor.delete();
              continue;
            }
            ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(serviceID, false);
            if (domain==null)
            {
              // the domain has been removed since the record was written in the
              // draftCNDb, thus it makes no sense to keep the record in the
              // draftCNDb.
              size++;
              cursor.delete();
            }
            else
            {
              ServerState startState = domain.getStartState();
              // We don't use the returned endState but it's updating CN as
              // reading
              domain.getEligibleState(crossDomainEligibleCN);
              ChangeNumber fcn = startState.getMaxChangeNumber(
                  cn.getServerId());
              int currentKey = cursor.currentKey();
              // Do not delete the lastKey. This should allow us to
              // preserve last change number over time.
              if ((currentKey != lastkey) && (cn.older(fcn)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstkey = currentKey;
                finished = true;
              }
            }
            cursor.delete();
            continue;
          }
          ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(serviceID, false);
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
            // draftCNDb, thus it makes no sense to keep the record in the
            // draftCNDb.
            cursor.delete();
            continue;
          }
          ServerState startState = domain.getStartState();
          // We don't use the returned endState but it's updating CN as
          // reading
          domain.getEligibleState(crossDomainEligibleCN);
          ChangeNumber fcn = startState.getMaxChangeNumber(cn
              .getServerId());
          int currentKey = cursor.currentKey();
          // Do not delete the lastKey. This should allow us to
          // preserve last change number over time.
          if ((currentKey != lastkey) && (cn.older(fcn)))
          {
            cursor.delete();
            continue;
          }
          firstkey = currentKey;
          cursor.close();
          return;
        }
        cursor.close();
        done = true;
      }
      catch (LockConflictException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          shutdown = true;
          throw e;
        }
      }
      catch (Exception e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        shutdown = true;
        throw e;
      }
    }
@@ -492,7 +468,7 @@
   */
  public void setPurgeDelay(long delay)
  {
    trimage = delay;
    trimAge = delay;
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
import java.io.UnsupportedEncodingException;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
@@ -64,9 +56,6 @@
  private int serverId;
  private String baseDn;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
@@ -180,98 +169,48 @@
   */
  public void addEntries(List<UpdateMsg> changes)
  {
    Transaction txn = null;
    try
    {
      int tries = 0;
      boolean done = false;
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      dbCloseLock.readLock().lock();
      try
      {
        dbCloseLock.readLock().lock();
        try
        for (UpdateMsg change : changes)
        {
          txn = dbenv.beginTransaction();
          DatabaseEntry key = new ReplicationKey(
              change.getChangeNumber());
          DatabaseEntry data = new ReplicationData(change);
          for (UpdateMsg change : changes)
          if ((counterCurrValue != 0)
              && (counterCurrValue % counterWindowSize == 0))
          {
            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
            DatabaseEntry data = new ReplicationData(change);
            if ((counterCurrValue!=0) &&
                (counterCurrValue%counterWindowSize == 0))
            {
              // enough changes to generate a counter record - wait for the next
              // change fo time
              counterTsLimit = change.getChangeNumber().getTime();
            }
            if ((counterTsLimit!=0)
                && (change.getChangeNumber().getTime() != counterTsLimit))
            {
              // Write the counter record
              DatabaseEntry counterKey = new ReplicationKey(
                  new ChangeNumber(
                  change.getChangeNumber().getTime(),
                  0, 0));
              DatabaseEntry counterValue =
                encodeCounterValue(counterCurrValue-1);
              db.put(txn, counterKey, counterValue);
              counterTsLimit=0;
            }
            db.put(txn, key, data);
            counterCurrValue++;
            // enough changes to generate a counter record - wait for the next
            // change of time
            counterTsLimit = change.getChangeNumber().getTime();
          }
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (LockConflictException e)
        {
          // Try again.
        }
        finally
        {
          if (txn != null)
          if ((counterTsLimit != 0)
              && (change.getChangeNumber().getTime() != counterTsLimit))
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
            // Write the counter record
            DatabaseEntry counterKey = new ReplicationKey(
                new ChangeNumber(change.getChangeNumber().getTime(),
                    0, 0));
            DatabaseEntry counterValue =
              encodeCounterValue(counterCurrValue - 1);
            db.put(null, counterKey, counterValue);
            counterTsLimit = 0;
          }
          dbCloseLock.readLock().unlock();
          db.put(null, key, data);
          counterCurrValue++;
        }
      }
      if (!done)
      finally
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        replicationServer.shutdown();
        dbCloseLock.readLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
    }
  }
@@ -334,13 +273,24 @@
    return new ReplServerDBCursor();
  }
  private void closeLockedCursor(Cursor cursor)
    throws DatabaseException
      throws DatabaseException
  {
    try
    {
      if (cursor != null)
        cursor.close();
      {
        try
        {
          cursor.close();
        }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
    }
    finally
    {
@@ -358,53 +308,53 @@
    String str = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      dbCloseLock.readLock().lock();
      try
      {
        /* database is empty */
        return null;
      }
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
      str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        // First record is a counter record .. go next
        status = cursor.getNext(key, data, LockMode.DEFAULT);
        cursor = db.openCursor(null, null);
        OperationStatus status = cursor.getFirst(key, data,
            LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          // DB contains only a counter record
          /* database is empty */
          return null;
        }
        else
        str = decodeUTF8(key.getData());
        cn = new ChangeNumber(str);
        if (ReplicationDB.isaCounter(cn))
        {
          cn = new ChangeNumber(decodeUTF8(key.getData()));
          // First record is a counter record .. go next
          status = cursor.getNext(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          {
            // DB contains only a counter record
            return null;
          }
          else
          {
            cn = new ChangeNumber(decodeUTF8(key.getData()));
          }
        }
      }
      finally
      {
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -420,57 +370,56 @@
    Cursor cursor = null;
    ChangeNumber cn = null;
    dbCloseLock.readLock().lock();
    try
    {
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getLast(key, data,
          LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      dbCloseLock.readLock().lock();
      try
      {
        /* database is empty */
        return null;
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        OperationStatus status = cursor.getLast(key, data,
            LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          return null;
        }
        String str = decodeUTF8(key.getData());
        cn = new ChangeNumber(str);
        if (ReplicationDB.isaCounter(cn))
        {
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              != OperationStatus.SUCCESS)
          {
            /*
             * database only contain a counter record - don't know how much it
             * can be possible but ...
             */
            cn = null;
          }
          else
          {
            str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            // There can't be 2 counter record next to each other
          }
        }
      }
      String str = decodeUTF8(key.getData());
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      finally
      {
        if (cursor.getPrev(key, data,
            LockMode.DEFAULT) != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
           */
          cn = null;
        }
        else
        {
          str = decodeUTF8(key.getData());
          cn= new ChangeNumber(str);
          // There can't be 2 counter record next to each other
        }
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -496,85 +445,80 @@
    DatabaseEntry key = new ReplicationKey(changeNumber);
    DatabaseEntry data = new DatabaseEntry();
    Transaction txn = null;
    dbCloseLock.readLock().lock();
    try
    {
      cursor = db.openCursor(txn, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
              OperationStatus.SUCCESS)
      dbCloseLock.readLock().lock();
      try
      {
        // We can move close to the changeNumber.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
        cursor = db.openCursor(null, null);
        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
            == OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          // We can move close to the changeNumber.
          // Let's move to the previous change.
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            if (cursor.getPrev(key, data,
                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
            String str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            if (ReplicationDB.isaCounter(cn))
            {
              // database starts with a counter record.
              cn = null;
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
              {
                // database starts with a counter record.
                cn = null;
              }
              else
              {
                str = decodeUTF8(key.getData());
                cn = new ChangeNumber(str);
                // There can't be 2 counter record next to each other
              }
            }
            else
          }
          // else, there was no change previous to our changeNumber.
        }
        else
        {
          // We could not move the cursor past to the changeNumber
          // Check if the last change is older than changeNumber
          if (cursor.getLast(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            String str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            if (ReplicationDB.isaCounter(cn))
            {
              str = decodeUTF8(key.getData());
              cn= new ChangeNumber(str);
              // There can't be 2 counter record next to each other
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
              {
                /*
                 * database only contain a counter record, should not be
                 * possible, but Ok, let's just say no change Number
                 */
                cn = null;
              }
              else
              {
                str = decodeUTF8(key.getData());
                cn = new ChangeNumber(str);
                // There can't be 2 counter record next to each other
              }
            }
          }
        }
        // else, there was no change previous to our changeNumber.
      }
      else
      finally
      {
        // We could not move the cursor past to the changeNumber
        // Check if the last change is older than changeNumber
        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data,
              LockMode.DEFAULT) != OperationStatus.SUCCESS)
            {
              /*
               * database only contain a counter record, should not be
               * possible, but Ok, let's just say no change Number
               */
              cn = null;
            }
            else
            {
              str = decodeUTF8(key.getData());
              cn= new ChangeNumber(str);
              // There can't be 2 counter record next to each other
            }
          }
        }
        closeLockedCursor(cursor);
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      // TODO: Verify if shutting down replication is the right thing to do
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
@@ -669,14 +613,7 @@
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        closeLockedCursor(localCursor);
        throw e;
      }
    }
@@ -703,14 +640,7 @@
      }
      catch (Exception e)
      {
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        closeLockedCursor(localCursor);
        if (localTxn != null)
        {
@@ -741,41 +671,19 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
        try
        {
          txn.commit();
          // No need for durability when purging.
          txn.commit(Durability.COMMIT_NO_SYNC);
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -796,26 +704,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
@@ -825,18 +714,9 @@
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -972,143 +852,140 @@
    int distToCounterRecord1 = 0;
    int distBackToCounterRecord2 = 0;
    int count=0;
    Cursor cursor = null;
    Transaction txn = null;
    OperationStatus status;
    try
    {
      ChangeNumber cn ;
      if ((start==null)&&(stop==null))
        return (int)db.count();
      // Step 1 : from the start point, traverse db to the next counter record
      // or to the stop point.
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(txn, null);
      if (start != null)
      Cursor cursor = null;
      try
      {
        key = new ReplicationKey(start);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
      }
      else
      {
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
        ChangeNumber cn ;
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        String csnString = decodeUTF8(key.getData());
        cn = new ChangeNumber(csnString);
        if (cn.getServerId() != 0)
        if ((start==null)&&(stop==null))
          return (int)db.count();
        // Step 1 : from the start point, traverse db to the next counter record
        // or to the stop point.
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        if (start != null)
        {
          // reached a regular change record
          // test whether we reached the 'stop' target
          if (!cn.newer(stop))
          {
            // let's loop
            distToCounterRecord1++;
            status = cursor.getNext(key, data, LockMode.DEFAULT);
          }
          else
          {
            // reached the end
            break;
          }
          key = new ReplicationKey(start);
          status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
          if (status == OperationStatus.NOTFOUND)
            status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
        }
        else
        {
          // counter record
          counterRecord1 = decodeCounterValue(data.getData());
          break;
          status = cursor.getNext(key, data, LockMode.DEFAULT);
        }
      }
      cursor.close();
      // cases
      //
      if (counterRecord1==0)
        return distToCounterRecord1;
        while (status == OperationStatus.SUCCESS)
        {
          // test whether the record is a regular change or a counter
          String csnString = decodeUTF8(key.getData());
          cn = new ChangeNumber(csnString);
          if (cn.getServerId() != 0)
          {
            // reached a regular change record
            // test whether we reached the 'stop' target
            if (!cn.newer(stop))
            {
              // let's loop
              distToCounterRecord1++;
              status = cursor.getNext(key, data, LockMode.DEFAULT);
            }
            else
            {
              // reached the end
              break;
            }
          }
          else
          {
            // counter record
            counterRecord1 = decodeCounterValue(data.getData());
            break;
          }
        }
        cursor.close();
      // Step 2 : from the stop point, traverse db to the next counter record
      // or to the start point.
      txn = null;
      data = new DatabaseEntry();
      key = new ReplicationKey(stop);
      cursor = db.openCursor(txn, null);
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(decodeUTF8(key.getData()));
      }
      else
      {
        key = new DatabaseEntry();
        // cases
        //
        if (counterRecord1==0)
          return distToCounterRecord1;
        // Step 2 : from the stop point, traverse db to the next counter record
        // or to the start point.
        data = new DatabaseEntry();
        status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        key = new ReplicationKey(stop);
        cursor = db.openCursor(null, null);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.SUCCESS)
        {
          /* database is empty */
          return 0;
          cn = new ChangeNumber(decodeUTF8(key.getData()));
        }
      }
      while (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        else
        {
          // regular change record
          if (!cn.older(start))
          key = new DatabaseEntry();
          data = new DatabaseEntry();
          status = cursor.getLast(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          {
            distBackToCounterRecord2++;
            status = cursor.getPrev(key, data, LockMode.DEFAULT);
            /* database is empty */
            return 0;
          }
        }
        while (status == OperationStatus.SUCCESS)
        {
          cn = new ChangeNumber(decodeUTF8(key.getData()));
          if (!ReplicationDB.isaCounter(cn))
          {
            // regular change record
            if (!cn.older(start))
            {
              distBackToCounterRecord2++;
              status = cursor.getPrev(key, data, LockMode.DEFAULT);
            }
            else
              break;
          }
          else
          {
            // counter record
            counterRecord2 = decodeCounterValue(data.getData());
            break;
          }
        }
        else
        cursor.close();
        // Step 3 : Now consolidates the result
        if (counterRecord1!=0)
        {
          // counter record
          counterRecord2 = decodeCounterValue(data.getData());
          break;
          if (counterRecord1 == counterRecord2)
          {
            // only one cp between from and to - no need to use it
            count = distToCounterRecord1 + distBackToCounterRecord2;
          }
          else
          {
            // 2 cp between from and to
            count = distToCounterRecord1 + (counterRecord2-counterRecord1)
            + distBackToCounterRecord2;
          }
        }
      }
      cursor.close();
      // Step 3 : Now consolidates the result
      if (counterRecord1!=0)
      finally
      {
        if (counterRecord1 == counterRecord2)
        if (cursor != null)
        {
          // only one cp between from and to - no need to use it
          count = distToCounterRecord1 + distBackToCounterRecord2;
        }
        else
        {
          // 2 cp between from and to
          count = distToCounterRecord1 + (counterRecord2-counterRecord1)
            + distBackToCounterRecord2;
          cursor.close();
        }
      }
    }
    finally
    catch (DatabaseException e)
    {
      if (cursor != null)
        cursor.close();
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shutting down.
        }
      }
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    return count;
  }
opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2010 ForgeRock AS.
 *      Portions Copyright 2010-2011 ForgeRock AS.
 */
package org.opends.server.replication.server;
@@ -47,16 +47,20 @@
   * Creates a new ReplicationData object from an UpdateMsg.
   *
   * @param change the UpdateMsg used to create the ReplicationData.
   *
   * @throws UnsupportedEncodingException When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public ReplicationData(UpdateMsg change)
         throws UnsupportedEncodingException
  {
    // Always keep messages in the replication DB with the current protocol
    // version
    this.setData(change.getBytes());
    try
    {
      this.setData(change.getBytes());
    }
    catch (UnsupportedEncodingException e)
    {
      // This should not happen - UTF-8 is always available.
      throw new RuntimeException(e);
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -40,16 +40,8 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
import java.util.concurrent.TimeUnit;
/**
@@ -92,10 +84,24 @@
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam("je.cleaner.expunge", "true");
    envConfig.setConfigParam("je.cleaner.threads", "2");
    envConfig.setConfigParam("je.checkpointer.highPriority", "true");
    // If the JVM is reasonably large then we can safely default to
    // bigger read buffers. This will result in more scalable checkpointer
    // and cleaner performance.
    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
    {
      envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", String
          .valueOf(2 * 1024 * 1024));
      envConfig.setConfigParam("je.log.iteratorReadSize", String
          .valueOf(2 * 1024 * 1024));
      envConfig.setConfigParam("je.log.faultReadSize", String
          .valueOf(4 * 1024));
    }
    // Tests have shown that since the parsing of the Replication log is always
    // done sequentially, it is not necessary to use a large DB cache.
    // Use 5M so that the replication can be used with 64M total for the JVM.
@@ -103,9 +109,14 @@
    // Since records are always added at the end of the Replication log and
    // deleted at the beginning of the Replication log, this should never
    // cause any deadlock. It is therefore safe to increase the TXN timeout
    // to 10 seconds.
    envConfig.setTxnTimeout(10, TimeUnit.SECONDS);
    // cause any deadlock.
    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
    // Since replication provides durability, we can reduce the DB durability
    // level so that we are immune to application / JVM crashes.
    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
    dbEnvironment = new Environment(new File(path), envConfig);
    /*
@@ -120,7 +131,6 @@
    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
    start();
  }
  /**
@@ -316,7 +326,7 @@
              TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
                " serverId/Domain=<"+stringId+">");
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
@@ -347,7 +357,7 @@
                  "Created in the state Db record Tag/Domain/GenId key=" +
                  stringId + " value=" + dataStringId);
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
@@ -432,7 +442,7 @@
          try
          {
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
@@ -495,7 +505,7 @@
          try {
            data.setData(byteId);
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
            if (debugEnabled())
              TRACER.debugInfo(
                  " In " + this.replicationServer.getMonitorInstanceName() +
@@ -532,7 +542,7 @@
      {
        txn = dbEnvironment.beginTransaction(null, null);
        dbEnvironment.truncateDatabase(txn, databaseName, false);
        txn.commitWriteNoSync();
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
        txn = null;
      }
      catch (DatabaseException e)
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -32,6 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.ServerConstants.EOL;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -992,7 +993,7 @@
   * @return  The time after which changes must be deleted from the
   *          persistent storage (in milliseconds).
   */
  long getTrimage()
  long getTrimAge()
  {
    return purgeDelay * 1000;
  }
@@ -2002,4 +2003,24 @@
    }
  }
  /**
   * Shuts down replication when an unexpected database exception occurs. Note
   * that we do not expect lock timeouts or txn timeouts because the replication
   * databases are deadlock free, thus all operations should complete
   * eventually.
   *
   * @param e
   *          The unexpected database exception.
   */
  void handleUnexpectedDatabaseException(DatabaseException e)
  {
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
    mb.append(stackTraceToSingleLineString(e));
    logError(mb.toMessage());
    shutdown();
  }
}