mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.33.2011 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d
Fix OPENDJ-184: Transient errors when accessing cn=changelog DraftCN DB result in complete shutdown of the replication service

See issue for more info.
8 files modified
752 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 104 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDB.java 183 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNData.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 80 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 277 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationData.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 50 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 23 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,7 +51,6 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
/**
 * This class is used for managing the replicationServer database for each
@@ -111,9 +110,6 @@
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  private long latestTrimDate = 0;
  /**
@@ -122,7 +118,7 @@
   * are older than this age are removed.
   *
   */
  private long trimage;
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -143,7 +139,7 @@
    this.replicationServer = replicationServer;
    serverId = id;
    this.baseDn = baseDn;
    trimage = replicationServer.getTrimage();
    trimAge = replicationServer.getTrimAge();
    queueMaxSize = queueSize;
    queueLowmark = queueSize * 1 / 5;
    queueHimark = queueSize * 4 / 5;
@@ -291,43 +287,6 @@
  }
  /**
   * Return the number of changes between 2 provided change numbers.
   * @param from The lower (older) change number.
   * @param to   The upper (newer) change number.
   * @return The computed number of changes.
   */
  public int traverseAndCount(ChangeNumber from, ChangeNumber to)
  {
    int count = 0;
    flush();
    ReplServerDBCursor cursor = null;
    try
    {
      try
      {
        cursor = db.openReadCursor(from);
      }
      catch(Exception e)
      {
        return 0;
      }
      ChangeNumber curr = null;
      while ((curr = cursor.nextChangeNumber())!=null)
      {
        if (curr.newerOrEquals(to))
          break;
        count++;
      }
    }
    finally
    {
      if (cursor != null)
        cursor.abort();
    }
    return count;
  }
  /**
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
@@ -456,80 +415,67 @@
   */
  private void trim() throws DatabaseException, Exception
  {
    if (trimage == 0)
    if (trimAge == 0)
    {
      return;
    int size = 0;
    boolean finished = false;
    boolean done = false;
    }
    latestTrimDate = TimeThread.getTime() - trimage;
    latestTrimDate = TimeThread.getTime() - trimAge;
    ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
    // Find the last changeNumber before the trimDate, in the Database.
    ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
    ChangeNumber lastBeforeTrimDate = db
        .getPreviousChangeNumber(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    for (int i = 0; i < 100; i++)
    {
      synchronized (flushLock)
      {
        /* the trim is done by group in order to save some CPU and IO bandwidth
        /*
         * the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         */
        ReplServerDBCursor cursor = db.openDeleteCursor();
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
        {
          while ((size < 5000 ) &&  (!finished))
          for (int j = 0; j < 50; j++)
          {
            ChangeNumber changeNumber = cursor.nextChangeNumber();
            if (changeNumber != null)
            if (changeNumber == null)
            {
              cursor.close();
              done = true;
              return;
            }
              if ((!changeNumber.equals(lastChange))
                  && (changeNumber.older(trimDate)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstChange = changeNumber;
                finished = true;
              }
            }
            else
              finished = true;
          }
          cursor.close();
          done = true;
              return;
        }
        catch (LockConflictException e)
        {
          cursor.abort();
          if (tries == DEADLOCK_RETRIES)
          {
            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
            // shutdown the ReplicationServer.
            shutdown = true;
            throw (e);
          }
          cursor.close();
        }
        catch (Exception e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          shutdown = true;
          cursor.abort();
          throw (e);
          shutdown = true;
          throw e;
        }
      }
    }
@@ -644,7 +590,7 @@
   */
  public void setPurgeDelay(long delay)
  {
    trimage = delay;
    trimAge = delay;
  }
  /**
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -32,7 +32,6 @@
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.MessageBuilder;
@@ -40,14 +39,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
@@ -61,9 +53,6 @@
  private ReplicationDbEnv dbenv = null;
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
@@ -103,71 +92,41 @@
  public void addEntry(int draftCN, String value, String domainBaseDN,
      ChangeNumber changeNumber)
  {
    Transaction txn = null;
    try
    {
      int tries = 0;
      boolean done = false;
      DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
      DatabaseEntry data = new DraftCNData(draftCN,
          value, domainBaseDN, changeNumber);
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
      // Use a transaction so that we can override durability.
      Transaction txn = null;
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
          DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
          DatabaseEntry data = new DraftCNData(draftCN,
              value, domainBaseDN, changeNumber);
          db.put(txn, key, data);
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (LockConflictException e)
        {
          // Try again.
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
        }
        finally
        {
          if (txn != null)
          {
            // No effect if txn has committed.
          try
          {
            txn.abort();
            txn = null;
          }
          catch (Exception e)
          {
            // Ignored.
          }
          }
          dbCloseLock.readLock().unlock();
        }
      }
      if (!done)
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
    }
  }
@@ -229,13 +188,21 @@
  }
  private void closeLockedCursor(Cursor cursor)
  throws DatabaseException
  {
    try
    {
      if (cursor != null)
      {
        try
        {
        cursor.close();
    }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
    }
    finally
    {
      dbCloseLock.readLock().unlock();
@@ -248,12 +215,10 @@
   */
  public int readFirstDraftCN()
  {
    Cursor cursor = null;
    String str = null;
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      {
        cursor = db.openCursor(null, null);
@@ -265,9 +230,8 @@
          /* database is empty */
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
        return new Integer(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -277,20 +241,7 @@
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return 0;
    }
    catch (Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -318,12 +269,10 @@
   */
  public int readLastDraftCN()
  {
    Cursor cursor = null;
    String str = null;
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      {
        cursor = db.openCursor(null, null);
@@ -335,9 +284,8 @@
          /* database is empty */
          return 0;
        }
        str = decodeUTF8(key.getData());
        int sn = new Integer(str);
        return sn;
        return new Integer(decodeUTF8(key.getData()));
      }
      finally
      {
@@ -346,16 +294,7 @@
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return 0;
    }
    catch (Exception e)
    {
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
      return 0;
    }
  }
@@ -519,20 +458,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      if (txn != null)
      {
@@ -540,20 +466,11 @@
        {
          txn.commit();
        }
        catch (Exception e)
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -574,26 +491,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      if (txn != null)
      {
@@ -601,20 +499,11 @@
        {
          txn.abort();
        }
        catch (Exception e)
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,10 +23,12 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2010 ForgeRock AS.
 *      Portions Copyright 2010-2011 ForgeRock AS.
 */
package org.opends.server.replication.server;
import static org.opends.server.util.StaticUtils.getBytes;
import java.io.UnsupportedEncodingException;
import org.opends.messages.Message;
@@ -53,29 +55,14 @@
   * @param value The value (cookie).
   * @param serviceID The serviceID (domain DN).
   * @param changeNumber The replication change number.
   *
   * @throws UnsupportedEncodingException When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public DraftCNData(int draftCN, String value,
      String serviceID, ChangeNumber changeNumber)
  throws UnsupportedEncodingException
  {
    String record = value
                   + FIELD_SEPARATOR + serviceID
                   + FIELD_SEPARATOR + changeNumber;
    byte[] byteValue;
    try
    {
      byteValue = record.getBytes("UTF-8");
      this.setData(byteValue);
    }
    catch (UnsupportedEncodingException e)
    {
      // can't happen
      return;
    }
    setData(getBytes(record));
  }
  /**
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,7 +50,6 @@
import org.opends.server.types.InitializationException;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
/**
 * This class is used for managing the replicationServer database for each
@@ -84,16 +83,13 @@
  private DirectoryThread thread = null;
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimage;
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -108,7 +104,7 @@
         throws DatabaseException
  {
    this.replicationServer = replicationServer;
    this.trimage = replicationServer.getTrimage();
    this.trimAge = replicationServer.getTrimAge();
    // DB initialization
    db = new DraftCNDB(replicationServer, dbenv);
@@ -312,7 +308,7 @@
   */
  public void trim() throws DatabaseException, Exception
  {
    if (trimage == 0)
    if (trimAge == 0)
      return;
    clear(null);
@@ -333,103 +329,83 @@
  throws DatabaseException, Exception
  {
    if (this.count()==0)
    {
      return;
    }
    int size = 0;
    int tries = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber crossDomainEligibleCN = replicationServer
        .getEligibleCN();
    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    for (int i = 0; i < 100; i++)
    {
      DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        while ((size < 5000 ) &&  (!finished))
        for (int j = 0; j < 50; j++)
        {
          // let's traverse the DraftCNDb
          if (!cursor.next())
          {
            finished=true;
            cursor.close();
            return;
          }
          else
          {
            ChangeNumber cn = cursor.currentChangeNumber();
            // From the draftCNDb change record, get the domain and changeNumber
            String serviceID = cursor.currentServiceID();
            if ((serviceIDToClear!=null) &&
                (serviceIDToClear.equalsIgnoreCase(serviceID)))
          if ((serviceIDToClear != null)
              && (serviceIDToClear.equalsIgnoreCase(serviceID)))
            {
              size++;
              cursor.delete();
              continue;
            }
            ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(serviceID, false);
          ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(serviceID, false);
            if (domain==null)
            {
              // the domain has been removed since the record was written in the
              // draftCNDb, thus it makes no sense to keep the record in the
              // draftCNDb.
              size++;
              cursor.delete();
            continue;
            }
            else
            {
              ServerState startState = domain.getStartState();
              // We don't use the returned endState but it's updating CN as
              // reading
              domain.getEligibleState(crossDomainEligibleCN);
              ChangeNumber fcn = startState.getMaxChangeNumber(
                  cn.getServerId());
          ChangeNumber fcn = startState.getMaxChangeNumber(cn
              .getServerId());
              int currentKey = cursor.currentKey();
              // Do not delete the lastKey. This should allow us to
              // preserve last change number over time.
              if ((currentKey != lastkey) && (cn.older(fcn)))
              {
                size++;
                cursor.delete();
            continue;
              }
              else
              {
                firstkey = currentKey;
                finished = true;
              }
            }
          }
        }
        cursor.close();
        done = true;
          return;
      }
      catch (LockConflictException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          shutdown = true;
          throw e;
        }
        cursor.close();
      }
      catch (Exception e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        shutdown = true;
        throw e;
      }
    }
@@ -492,7 +468,7 @@
   */
  public void setPurgeDelay(long delay)
  {
    trimage = delay;
    trimAge = delay;
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
import java.io.UnsupportedEncodingException;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
/**
 * This class implements the interface between the underlying database
@@ -64,9 +56,6 @@
  private int serverId;
  private String baseDn;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
@@ -180,34 +169,22 @@
   */
  public void addEntries(List<UpdateMsg> changes)
  {
    Transaction txn = null;
    try
    {
      int tries = 0;
      boolean done = false;
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
          for (UpdateMsg change : changes)
          {
            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
          DatabaseEntry key = new ReplicationKey(
              change.getChangeNumber());
            DatabaseEntry data = new ReplicationData(change);
            if ((counterCurrValue!=0) &&
                (counterCurrValue%counterWindowSize == 0))
          if ((counterCurrValue != 0)
              && (counterCurrValue % counterWindowSize == 0))
            {
              // enough changes to generate a counter record - wait for the next
              // change fo time
            // change of time
              counterTsLimit = change.getChangeNumber().getTime();
            }
            if ((counterTsLimit!=0)
@@ -215,63 +192,25 @@
            {
              // Write the counter record
              DatabaseEntry counterKey = new ReplicationKey(
                  new ChangeNumber(
                  change.getChangeNumber().getTime(),
                new ChangeNumber(change.getChangeNumber().getTime(),
                  0, 0));
              DatabaseEntry counterValue =
                encodeCounterValue(counterCurrValue-1);
              db.put(txn, counterKey, counterValue);
            db.put(null, counterKey, counterValue);
              counterTsLimit=0;
            }
            db.put(txn, key, data);
          db.put(null, key, data);
            counterCurrValue++;
          }
          txn.commitWriteNoSync();
          txn = null;
          done = true;
        }
        catch (LockConflictException e)
        {
          // Try again.
        }
        finally
        {
          if (txn != null)
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
          }
          dbCloseLock.readLock().unlock();
        }
      }
      if (!done)
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      replicationServer.handleUnexpectedDatabaseException(e);
    }
  }
@@ -334,14 +273,25 @@
    return new ReplServerDBCursor();
  }
  private void closeLockedCursor(Cursor cursor)
    throws DatabaseException
  {
    try
    {
      if (cursor != null)
      {
        try
        {
        cursor.close();
    }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
    }
    finally
    {
      dbCloseLock.readLock().unlock();
@@ -358,6 +308,8 @@
    String str = null;
    ChangeNumber cn = null;
    try
    {
    dbCloseLock.readLock().lock();
    try
    {
@@ -366,7 +318,8 @@
      cursor = db.openCursor(null, null);
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
        OperationStatus status = cursor.getFirst(key, data,
            LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
@@ -391,20 +344,17 @@
        }
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    return cn;
  }
@@ -420,6 +370,8 @@
    Cursor cursor = null;
    ChangeNumber cn = null;
    try
    {
    dbCloseLock.readLock().lock();
    try
    {
@@ -441,12 +393,12 @@
      cn = new ChangeNumber(str);
      if (ReplicationDB.isaCounter(cn))
      {
        if (cursor.getPrev(key, data,
            LockMode.DEFAULT) != OperationStatus.SUCCESS)
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
             * database only contain a counter record - don't know how much it
             * can be possible but ...
           */
          cn = null;
        }
@@ -458,19 +410,16 @@
        }
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    return cn;
  }
@@ -496,26 +445,26 @@
    DatabaseEntry key = new ReplicationKey(changeNumber);
    DatabaseEntry data = new DatabaseEntry();
    Transaction txn = null;
    try
    {
    dbCloseLock.readLock().lock();
    try
    {
      cursor = db.openCursor(txn, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
              OperationStatus.SUCCESS)
        cursor = db.openCursor(null, null);
        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
            == OperationStatus.SUCCESS)
      {
        // We can move close to the changeNumber.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data,
                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
            {
              // database starts with a counter record.
              cn = null;
@@ -534,15 +483,15 @@
      {
        // We could not move the cursor past to the changeNumber
        // Check if the last change is older than changeNumber
        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
          if (cursor.getLast(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data,
              LockMode.DEFAULT) != OperationStatus.SUCCESS)
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
            {
              /*
               * database only contain a counter record, should not be
@@ -560,21 +509,16 @@
        }
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      // TODO: Verify if shutting down replication is the right thing to do
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    return cn;
  }
@@ -669,14 +613,7 @@
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        throw e;
      }
    }
@@ -703,14 +640,7 @@
      }
      catch (Exception e)
      {
        try
        {
          closeLockedCursor(localCursor);
        }
        catch (Exception ignore)
        {
          // Ignore.
        }
        if (localTxn != null)
        {
@@ -741,41 +671,19 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      if (txn != null)
      {
        try
        {
          txn.commit();
          // No need for durability when purging.
          txn.commit(Durability.COMMIT_NO_SYNC);
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -796,26 +704,7 @@
        isClosed = true;
      }
      boolean closeHasFailed = false;
      try
      {
        closeLockedCursor(cursor);
      }
      catch (LockConflictException e)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        closeHasFailed = true;
      }
      if (txn != null)
      {
@@ -825,18 +714,9 @@
        }
        catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          closeHasFailed = true;
          replicationServer.handleUnexpectedDatabaseException(e);
        }
      }
      if (closeHasFailed)
      {
        replicationServer.shutdown();
      }
    }
    /**
@@ -972,11 +852,12 @@
    int distToCounterRecord1 = 0;
    int distBackToCounterRecord2 = 0;
    int count=0;
    Cursor cursor = null;
    Transaction txn = null;
    OperationStatus status;
    try
    {
      Cursor cursor = null;
      try
      {
      ChangeNumber cn ;
      if ((start==null)&&(stop==null))
@@ -986,7 +867,7 @@
      // or to the stop point.
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(txn, null);
        cursor = db.openCursor(null, null);
      if (start != null)
      {
        key = new ReplicationKey(start);
@@ -1036,10 +917,9 @@
      // Step 2 : from the stop point, traverse db to the next counter record
      // or to the start point.
      txn = null;
      data = new DatabaseEntry();
      key = new ReplicationKey(stop);
      cursor = db.openCursor(txn, null);
        cursor = db.openCursor(null, null);
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS)
      {
@@ -1098,18 +978,15 @@
    finally
    {
      if (cursor != null)
        {
        cursor.close();
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shutting down.
        }
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    return count;
  }
opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2010 ForgeRock AS.
 *      Portions Copyright 2010-2011 ForgeRock AS.
 */
package org.opends.server.replication.server;
@@ -47,17 +47,21 @@
   * Creates a new ReplicationData object from an UpdateMsg.
   *
   * @param change the UpdateMsg used to create the ReplicationData.
   *
   * @throws UnsupportedEncodingException When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public ReplicationData(UpdateMsg change)
         throws UnsupportedEncodingException
  {
    // Always keep messages in the replication DB with the current protocol
    // version
    try
    {
    this.setData(change.getBytes());
  }
    catch (UnsupportedEncodingException e)
    {
      // This should not happen - UTF-8 is always available.
      throw new RuntimeException(e);
    }
  }
  /**
   * Generate an UpdateMsg from its byte[] form.
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -40,16 +40,8 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
import java.util.concurrent.TimeUnit;
/**
@@ -92,10 +84,24 @@
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam("je.cleaner.expunge", "true");
    envConfig.setConfigParam("je.cleaner.threads", "2");
    envConfig.setConfigParam("je.checkpointer.highPriority", "true");
    // If the JVM is reasonably large then we can safely default to
    // bigger read buffers. This will result in more scalable checkpointer
    // and cleaner performance.
    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
    {
      envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", String
          .valueOf(2 * 1024 * 1024));
      envConfig.setConfigParam("je.log.iteratorReadSize", String
          .valueOf(2 * 1024 * 1024));
      envConfig.setConfigParam("je.log.faultReadSize", String
          .valueOf(4 * 1024));
    }
    // Tests have shown that since the parsing of the Replication log is always
    // done sequentially, it is not necessary to use a large DB cache.
    // Use 5M so that the replication can be used with 64M total for the JVM.
@@ -103,9 +109,14 @@
    // Since records are always added at the end of the Replication log and
    // deleted at the beginning of the Replication log, this should never
    // cause any deadlock. It is therefore safe to increase the TXN timeout
    // to 10 seconds.
    envConfig.setTxnTimeout(10, TimeUnit.SECONDS);
    // cause any deadlock.
    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
    // Since replication provides durability, we can reduce the DB durability
    // level so that we are immune to application / JVM crashes.
    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
    dbEnvironment = new Environment(new File(path), envConfig);
    /*
@@ -120,7 +131,6 @@
    stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
    start();
  }
  /**
@@ -316,7 +326,7 @@
              TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
                " serverId/Domain=<"+stringId+">");
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
@@ -347,7 +357,7 @@
                  "Created in the state Db record Tag/Domain/GenId key=" +
                  stringId + " value=" + dataStringId);
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
@@ -432,7 +442,7 @@
          try
          {
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
@@ -495,7 +505,7 @@
          try {
            data.setData(byteId);
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
            if (debugEnabled())
              TRACER.debugInfo(
                  " In " + this.replicationServer.getMonitorInstanceName() +
@@ -532,7 +542,7 @@
      {
        txn = dbEnvironment.beginTransaction(null, null);
        dbEnvironment.truncateDatabase(txn, databaseName, false);
        txn.commitWriteNoSync();
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
        txn = null;
      }
      catch (DatabaseException e)
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -32,6 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.ServerConstants.EOL;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -992,7 +993,7 @@
   * @return  The time after which changes must be deleted from the
   *          persistent storage (in milliseconds).
   */
  long getTrimage()
  long getTrimAge()
  {
    return purgeDelay * 1000;
  }
@@ -2002,4 +2003,24 @@
    }
  }
  /**
   * Shuts down replication when an unexpected database exception occurs. Note
   * that we do not expect lock timeouts or txn timeouts because the replication
   * databases are deadlock free, thus all operations should complete
   * eventually.
   *
   * @param e
   *          The unexpected database exception.
   */
  void handleUnexpectedDatabaseException(DatabaseException e)
  {
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
    mb.append(stackTraceToSingleLineString(e));
    logError(mb.toMessage());
    shutdown();
  }
}