From f17749258c94074301cf8fcadb7382b864d4e48b Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 08 Jun 2011 14:33:10 +0000
Subject: [PATCH] Fix OPENDJ-184: Transient errors when accessing cn=changelog DraftCN DB result in complete shutdown of the replication service

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java         |  120 +---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java     |  675 ++++++++++++------------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java |   23 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java   |   16 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java         |  211 ++-------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java  |  146 ++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java  |   50 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java       |   21 
 8 files changed, 486 insertions(+), 776 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index cfb83e4..b147246 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,7 +51,6 @@
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
 
 import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockConflictException;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -111,9 +110,6 @@
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
-  // The maximum number of retries in case of DatabaseDeadlock Exception.
-  private static final int DEADLOCK_RETRIES = 10;
-
   private long latestTrimDate = 0;
 
   /**
@@ -122,7 +118,7 @@
    * are older than this age are removed.
    *
    */
-  private long trimage;
+  private long trimAge;
 
   /**
    * Creates a new dbHandler associated to a given LDAP server.
@@ -143,7 +139,7 @@
     this.replicationServer = replicationServer;
     serverId = id;
     this.baseDn = baseDn;
-    trimage = replicationServer.getTrimage();
+    trimAge = replicationServer.getTrimAge();
     queueMaxSize = queueSize;
     queueLowmark = queueSize * 1 / 5;
     queueHimark = queueSize * 4 / 5;
@@ -291,43 +287,6 @@
   }
 
   /**
-   * Return the number of changes between 2 provided change numbers.
-   * @param from The lower (older) change number.
-   * @param to   The upper (newer) change number.
-   * @return The computed number of changes.
-   */
-  public int traverseAndCount(ChangeNumber from, ChangeNumber to)
-  {
-    int count = 0;
-    flush();
-    ReplServerDBCursor cursor = null;
-    try
-    {
-      try
-      {
-        cursor = db.openReadCursor(from);
-      }
-      catch(Exception e)
-      {
-        return 0;
-      }
-      ChangeNumber curr = null;
-      while ((curr = cursor.nextChangeNumber())!=null)
-      {
-        if (curr.newerOrEquals(to))
-          break;
-        count++;
-      }
-    }
-    finally
-    {
-      if (cursor != null)
-        cursor.abort();
-    }
-    return count;
-  }
-
-  /**
    * Removes the provided number of messages from the beginning of the msgQueue.
    *
    * @param number the number of changes to be removed.
@@ -456,80 +415,67 @@
    */
   private void trim() throws DatabaseException, Exception
   {
-    if (trimage == 0)
+    if (trimAge == 0)
+    {
       return;
-    int size = 0;
-    boolean finished = false;
-    boolean done = false;
+    }
 
-    latestTrimDate = TimeThread.getTime() - trimage;
+    latestTrimDate = TimeThread.getTime() - trimAge;
 
     ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
 
     // Find the last changeNumber before the trimDate, in the Database.
-    ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
+    ChangeNumber lastBeforeTrimDate = db
+        .getPreviousChangeNumber(trimDate);
     if (lastBeforeTrimDate != null)
     {
       // If we found it, we want to stop trimming when reaching it.
       trimDate = lastBeforeTrimDate;
     }
-    // In case of deadlock detection by the Database, this thread can
-    // by aborted by a DeadlockException. This is a transient error and
-    // the transaction should be attempted again.
-    // We will try DEADLOCK_RETRIES times before failing.
-    int tries = 0;
-    while ((tries++ < DEADLOCK_RETRIES) && (!done))
+
+    for (int i = 0; i < 100; i++)
     {
       synchronized (flushLock)
       {
-        /* the trim is done by group in order to save some CPU and IO bandwidth
+        /*
+         * the trim is done by group in order to save some CPU and IO bandwidth
          * start the transaction then do a bunch of remove then commit
          */
-        ReplServerDBCursor cursor = db.openDeleteCursor();
-
+        final ReplServerDBCursor cursor = db.openDeleteCursor();
         try
         {
-          while ((size < 5000 ) &&  (!finished))
+          for (int j = 0; j < 50; j++)
           {
             ChangeNumber changeNumber = cursor.nextChangeNumber();
-            if (changeNumber != null)
+            if (changeNumber == null)
             {
-              if ((!changeNumber.equals(lastChange))
-                  && (changeNumber.older(trimDate)))
-              {
-                size++;
-                cursor.delete();
-              }
-              else
-              {
-                firstChange = changeNumber;
-                finished = true;
-              }
+              cursor.close();
+              done = true;
+              return;
+            }
+
+            if ((!changeNumber.equals(lastChange))
+                && (changeNumber.older(trimDate)))
+            {
+              cursor.delete();
             }
             else
-              finished = true;
+            {
+              firstChange = changeNumber;
+              cursor.close();
+              done = true;
+              return;
+            }
           }
           cursor.close();
-          done = true;
-        }
-        catch (LockConflictException e)
-        {
-          cursor.abort();
-          if (tries == DEADLOCK_RETRIES)
-          {
-            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
-            // shutdown the ReplicationServer.
-            shutdown = true;
-            throw (e);
-          }
         }
         catch (Exception e)
         {
           // mark shutdown for this db so that we don't try again to
           // stop it from cursor.close() or methods called by cursor.close()
-          shutdown = true;
           cursor.abort();
-          throw (e);
+          shutdown = true;
+          throw e;
         }
       }
     }
@@ -644,7 +590,7 @@
    */
   public void setPurgeDelay(long delay)
   {
-    trimage = delay;
+    trimAge = delay;
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index 85af56e..d060a5b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -32,7 +32,6 @@
 import static org.opends.server.util.StaticUtils.decodeUTF8;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
-import java.io.UnsupportedEncodingException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.opends.messages.MessageBuilder;
@@ -40,14 +39,7 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.types.DebugLogLevel;
 
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
 
 /**
  * This class implements the interface between the underlying database
@@ -61,9 +53,6 @@
   private ReplicationDbEnv dbenv = null;
   private ReplicationServer replicationServer;
 
-  // The maximum number of retries in case of DatabaseDeadlock Exception.
-  private static final int DEADLOCK_RETRIES = 10;
-
   // The lock used to provide exclusive access to the thread that
   // close the db (shutdown or clear).
   private ReentrantReadWriteLock dbCloseLock;
@@ -103,71 +92,41 @@
   public void addEntry(int draftCN, String value, String domainBaseDN,
       ChangeNumber changeNumber)
   {
-    Transaction txn = null;
     try
     {
-      int tries = 0;
-      boolean done = false;
+      DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
+      DatabaseEntry data = new DraftCNData(draftCN,
+          value, domainBaseDN, changeNumber);
 
-      // The database can return a Deadlock Exception if several threads are
-      // accessing the database at the same time. This Exception is a
-      // transient state, when it happens the transaction is aborted and
-      // the operation is attempted again up to DEADLOCK_RETRIES times.
-      while ((tries++ < DEADLOCK_RETRIES) && (!done))
+      // Use a transaction so that we can override durability.
+      Transaction txn = null;
+      dbCloseLock.readLock().lock();
+      try
       {
-        dbCloseLock.readLock().lock();
-        try
-        {
-          txn = dbenv.beginTransaction();
-
-          DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
-          DatabaseEntry data = new DraftCNData(draftCN,
-              value, domainBaseDN, changeNumber);
-          db.put(txn, key, data);
-          txn.commitWriteNoSync();
-          txn = null;
-          done = true;
-        }
-        catch (LockConflictException e)
-        {
-          // Try again.
-        }
-        finally
-        {
-          if (txn != null)
-          {
-            // No effect if txn has committed.
-            txn.abort();
-            txn = null;
-          }
-          dbCloseLock.readLock().unlock();
-        }
+        txn = dbenv.beginTransaction();
+        db.put(txn, key, data);
+        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
       }
-      if (!done)
+      finally
       {
-        // Could not write to the DB after DEADLOCK_RETRIES tries.
-        // This ReplicationServer is not reliable and will be shutdown.
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        logError(mb.toMessage());
-        replicationServer.shutdown();
+        if (txn != null)
+        {
+          // No effect if txn has committed.
+          try
+          {
+            txn.abort();
+          }
+          catch (Exception e)
+          {
+            // Ignored.
+          }
+        }
+        dbCloseLock.readLock().unlock();
       }
     }
     catch (DatabaseException e)
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
     }
   }
 
@@ -229,12 +188,20 @@
   }
 
   private void closeLockedCursor(Cursor cursor)
-  throws DatabaseException
   {
     try
     {
       if (cursor != null)
-        cursor.close();
+      {
+        try
+        {
+          cursor.close();
+        }
+        catch (DatabaseException e)
+        {
+          // Ignore.
+        }
+      }
     }
     finally
     {
@@ -248,12 +215,10 @@
    */
   public int readFirstDraftCN()
   {
-    Cursor cursor = null;
-    String str = null;
-
     try
     {
       dbCloseLock.readLock().lock();
+      Cursor cursor = null;
       try
       {
         cursor = db.openCursor(null, null);
@@ -265,9 +230,8 @@
           /* database is empty */
           return 0;
         }
-        str = decodeUTF8(key.getData());
-        int sn = new Integer(str);
-        return sn;
+
+        return new Integer(decodeUTF8(key.getData()));
       }
       finally
       {
@@ -277,20 +241,7 @@
     catch (DatabaseException e)
     {
       /* database is faulty */
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
-      return 0;
-    }
-    catch (Exception e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       return 0;
     }
   }
@@ -318,12 +269,10 @@
    */
   public int readLastDraftCN()
   {
-    Cursor cursor = null;
-    String str = null;
-
     try
     {
       dbCloseLock.readLock().lock();
+      Cursor cursor = null;
       try
       {
         cursor = db.openCursor(null, null);
@@ -335,9 +284,8 @@
           /* database is empty */
           return 0;
         }
-        str = decodeUTF8(key.getData());
-        int sn = new Integer(str);
-        return sn;
+
+        return new Integer(decodeUTF8(key.getData()));
       }
       finally
       {
@@ -346,16 +294,7 @@
     }
     catch (DatabaseException e)
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
-      return 0;
-    }
-    catch (Exception e)
-    {
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       return 0;
     }
   }
@@ -519,20 +458,7 @@
         isClosed = true;
       }
 
-      boolean closeHasFailed = false;
-
-      try
-      {
-        closeLockedCursor(cursor);
-      }
-      catch (Exception e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        closeHasFailed = true;
-      }
+      closeLockedCursor(cursor);
 
       if (txn != null)
       {
@@ -540,20 +466,11 @@
         {
           txn.commit();
         }
-        catch (Exception e)
+        catch (DatabaseException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          closeHasFailed = true;
+          replicationServer.handleUnexpectedDatabaseException(e);
         }
       }
-
-      if (closeHasFailed)
-      {
-        replicationServer.shutdown();
-      }
     }
 
     /**
@@ -574,26 +491,7 @@
         isClosed = true;
       }
 
-      boolean closeHasFailed = false;
-
-      try
-      {
-        closeLockedCursor(cursor);
-      }
-      catch (LockConflictException e)
-      {
-        // The DB documentation states that a DeadlockException
-        // on the close method of a cursor that is aborting should
-        // be ignored.
-      }
-      catch (Exception e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        closeHasFailed = true;
-      }
+      closeLockedCursor(cursor);
 
       if (txn != null)
       {
@@ -601,20 +499,11 @@
         {
           txn.abort();
         }
-        catch (Exception e)
+        catch (DatabaseException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          closeHasFailed = true;
+          replicationServer.handleUnexpectedDatabaseException(e);
         }
       }
-
-      if (closeHasFailed)
-      {
-        replicationServer.shutdown();
-      }
     }
 
     /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java
index fc4c171..47ce17e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,10 +23,12 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions Copyright 2010 ForgeRock AS.
+ *      Portions Copyright 2010-2011 ForgeRock AS.
  */
 package org.opends.server.replication.server;
 
+import static org.opends.server.util.StaticUtils.getBytes;
+
 import java.io.UnsupportedEncodingException;
 
 import org.opends.messages.Message;
@@ -53,29 +55,14 @@
    * @param value The value (cookie).
    * @param serviceID The serviceID (domain DN).
    * @param changeNumber The replication change number.
-   *
-   * @throws UnsupportedEncodingException When the encoding of the message
-   *         failed because the UTF-8 encoding is not supported.
    */
   public DraftCNData(int draftCN, String value,
       String serviceID, ChangeNumber changeNumber)
-  throws UnsupportedEncodingException
   {
     String record = value
                    + FIELD_SEPARATOR + serviceID
                    + FIELD_SEPARATOR + changeNumber;
-
-    byte[] byteValue;
-    try
-    {
-      byteValue = record.getBytes("UTF-8");
-      this.setData(byteValue);
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      // can't happen
-      return;
-    }
+    setData(getBytes(record));
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index ef4df7c..9230497 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,7 +50,6 @@
 import org.opends.server.types.InitializationException;
 
 import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockConflictException;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -84,16 +83,13 @@
   private DirectoryThread thread = null;
   private ReplicationServer replicationServer;
 
-  // The maximum number of retries in case of DatabaseDeadlock Exception.
-  private static final int DEADLOCK_RETRIES = 10;
-
   /**
    *
    * The trim age in milliseconds. Changes record in the change DB that
    * are older than this age are removed.
    *
    */
-  private long trimage;
+  private long trimAge;
 
   /**
    * Creates a new dbHandler associated to a given LDAP server.
@@ -108,7 +104,7 @@
          throws DatabaseException
   {
     this.replicationServer = replicationServer;
-    this.trimage = replicationServer.getTrimage();
+    this.trimAge = replicationServer.getTrimAge();
 
     // DB initialization
     db = new DraftCNDB(replicationServer, dbenv);
@@ -312,7 +308,7 @@
    */
   public void trim() throws DatabaseException, Exception
   {
-    if (trimage == 0)
+    if (trimAge == 0)
       return;
 
     clear(null);
@@ -330,106 +326,86 @@
    *
    */
   public void clear(String serviceIDToClear)
-  throws DatabaseException, Exception
+      throws DatabaseException, Exception
   {
-    if (this.count()==0)
+    if (this.count() == 0)
+    {
       return;
+    }
 
-    int size = 0;
-    int tries = 0;
-    boolean finished = false;
-    boolean done = false;
+    ChangeNumber crossDomainEligibleCN = replicationServer
+        .getEligibleCN();
 
-    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
-    // In case of deadlock detection by the Database, this thread can
-    // by aborted by a DeadlockException. This is a transient error and
-    // the transaction should be attempted again.
-    // We will try DEADLOCK_RETRIES times before failing.
-    while ((tries++ < DEADLOCK_RETRIES) && (!done))
+    for (int i = 0; i < 100; i++)
     {
       DraftCNDBCursor cursor = db.openDeleteCursor();
       try
       {
-        while ((size < 5000 ) &&  (!finished))
+        for (int j = 0; j < 50; j++)
         {
           // let's traverse the DraftCNDb
           if (!cursor.next())
           {
-            finished=true;
+            cursor.close();
+            return;
           }
-          else
+
+          ChangeNumber cn = cursor.currentChangeNumber();
+
+          // From the draftCNDb change record, get the domain and changeNumber
+          String serviceID = cursor.currentServiceID();
+
+          if ((serviceIDToClear != null)
+              && (serviceIDToClear.equalsIgnoreCase(serviceID)))
           {
-            ChangeNumber cn = cursor.currentChangeNumber();
-
-            // From the draftCNDb change record, get the domain and changeNumber
-            String serviceID = cursor.currentServiceID();
-
-            if ((serviceIDToClear!=null) &&
-                (serviceIDToClear.equalsIgnoreCase(serviceID)))
-            {
-              size++;
-              cursor.delete();
-              continue;
-            }
-
-            ReplicationServerDomain domain =
-              replicationServer.getReplicationServerDomain(serviceID, false);
-
-            if (domain==null)
-            {
-              // the domain has been removed since the record was written in the
-              // draftCNDb, thus it makes no sense to keep the record in the
-              // draftCNDb.
-              size++;
-              cursor.delete();
-            }
-            else
-            {
-              ServerState startState = domain.getStartState();
-
-              // We don't use the returned endState but it's updating CN as
-              // reading
-              domain.getEligibleState(crossDomainEligibleCN);
-
-              ChangeNumber fcn = startState.getMaxChangeNumber(
-                  cn.getServerId());
-
-              int currentKey = cursor.currentKey();
-              // Do not delete the lastKey. This should allow us to
-              // preserve last change number over time.
-              if ((currentKey != lastkey) && (cn.older(fcn)))
-              {
-                size++;
-                cursor.delete();
-              }
-              else
-              {
-                firstkey = currentKey;
-                finished = true;
-              }
-            }
+            cursor.delete();
+            continue;
           }
+
+          ReplicationServerDomain domain = replicationServer
+              .getReplicationServerDomain(serviceID, false);
+
+          if (domain == null)
+          {
+            // the domain has been removed since the record was written in the
+            // draftCNDb, thus it makes no sense to keep the record in the
+            // draftCNDb.
+            cursor.delete();
+            continue;
+          }
+
+          ServerState startState = domain.getStartState();
+
+          // We don't use the returned endState but it's updating CN as
+          // reading
+          domain.getEligibleState(crossDomainEligibleCN);
+
+          ChangeNumber fcn = startState.getMaxChangeNumber(cn
+              .getServerId());
+
+          int currentKey = cursor.currentKey();
+
+          // Do not delete the lastKey. This should allow us to
+          // preserve last change number over time.
+          if ((currentKey != lastkey) && (cn.older(fcn)))
+          {
+            cursor.delete();
+            continue;
+          }
+
+          firstkey = currentKey;
+          cursor.close();
+          return;
         }
+
         cursor.close();
-        done = true;
-      }
-      catch (LockConflictException e)
-      {
-        cursor.abort();
-        if (tries == DEADLOCK_RETRIES)
-        {
-          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
-          // shutdown the ReplicationServer.
-          shutdown = true;
-          throw e;
-        }
       }
       catch (Exception e)
       {
         // mark shutdown for this db so that we don't try again to
         // stop it from cursor.close() or methods called by cursor.close()
-        shutdown = true;
         cursor.abort();
+        shutdown = true;
         throw e;
       }
     }
@@ -492,7 +468,7 @@
    */
   public void setPurgeDelay(long delay)
   {
-    trimage = delay;
+    trimAge = delay;
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 2a6c676..411c776 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.util.List;
-import java.io.UnsupportedEncodingException;
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
 
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
 
 /**
  * This class implements the interface between the underlying database
@@ -64,9 +56,6 @@
   private int serverId;
   private String baseDn;
 
-  // The maximum number of retries in case of DatabaseDeadlock Exception.
-  private static final int DEADLOCK_RETRIES = 10;
-
   // The lock used to provide exclusive access to the thread that
   // close the db (shutdown or clear).
   private ReentrantReadWriteLock dbCloseLock;
@@ -180,98 +169,48 @@
    */
   public void addEntries(List<UpdateMsg> changes)
   {
-    Transaction txn = null;
-
     try
     {
-      int tries = 0;
-      boolean done = false;
-
-      // The database can return a Deadlock Exception if several threads are
-      // accessing the database at the same time. This Exception is a
-      // transient state, when it happens the transaction is aborted and
-      // the operation is attempted again up to DEADLOCK_RETRIES times.
-      while ((tries++ < DEADLOCK_RETRIES) && (!done))
+      dbCloseLock.readLock().lock();
+      try
       {
-        dbCloseLock.readLock().lock();
-        try
+        for (UpdateMsg change : changes)
         {
-          txn = dbenv.beginTransaction();
+          DatabaseEntry key = new ReplicationKey(
+              change.getChangeNumber());
+          DatabaseEntry data = new ReplicationData(change);
 
-          for (UpdateMsg change : changes)
+          if ((counterCurrValue != 0)
+              && (counterCurrValue % counterWindowSize == 0))
           {
-            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
-            DatabaseEntry data = new ReplicationData(change);
-
-            if ((counterCurrValue!=0) &&
-                (counterCurrValue%counterWindowSize == 0))
-            {
-              // enough changes to generate a counter record - wait for the next
-              // change fo time
-              counterTsLimit = change.getChangeNumber().getTime();
-            }
-            if ((counterTsLimit!=0)
-                && (change.getChangeNumber().getTime() != counterTsLimit))
-            {
-              // Write the counter record
-              DatabaseEntry counterKey = new ReplicationKey(
-                  new ChangeNumber(
-                  change.getChangeNumber().getTime(),
-                  0, 0));
-              DatabaseEntry counterValue =
-                encodeCounterValue(counterCurrValue-1);
-              db.put(txn, counterKey, counterValue);
-              counterTsLimit=0;
-            }
-            db.put(txn, key, data);
-            counterCurrValue++;
-
+            // enough changes to generate a counter record - wait for the next
+            // change of time
+            counterTsLimit = change.getChangeNumber().getTime();
           }
-          txn.commitWriteNoSync();
-          txn = null;
-          done = true;
-        }
-        catch (LockConflictException e)
-        {
-          // Try again.
-        }
-        finally
-        {
-          if (txn != null)
+          if ((counterTsLimit != 0)
+              && (change.getChangeNumber().getTime() != counterTsLimit))
           {
-            // No effect if txn has committed.
-            txn.abort();
-            txn = null;
+            // Write the counter record
+            DatabaseEntry counterKey = new ReplicationKey(
+                new ChangeNumber(change.getChangeNumber().getTime(),
+                    0, 0));
+            DatabaseEntry counterValue =
+              encodeCounterValue(counterCurrValue - 1);
+            db.put(null, counterKey, counterValue);
+            counterTsLimit = 0;
           }
-
-          dbCloseLock.readLock().unlock();
+          db.put(null, key, data);
+          counterCurrValue++;
         }
       }
-      if (!done)
+      finally
       {
-        // Could not write to the DB after DEADLOCK_RETRIES tries.
-        // This ReplicationServer is not reliable and will be shutdown.
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        logError(mb.toMessage());
-        replicationServer.shutdown();
+        dbCloseLock.readLock().unlock();
       }
     }
     catch (DatabaseException e)
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
     }
   }
 
@@ -334,13 +273,24 @@
     return new ReplServerDBCursor();
   }
 
+
+
   private void closeLockedCursor(Cursor cursor)
-    throws DatabaseException
+      throws DatabaseException
   {
     try
     {
       if (cursor != null)
-        cursor.close();
+      {
+        try
+        {
+          cursor.close();
+        }
+        catch (DatabaseException e)
+        {
+          // Ignore.
+        }
+      }
     }
     finally
     {
@@ -358,53 +308,53 @@
     String str = null;
     ChangeNumber cn = null;
 
-    dbCloseLock.readLock().lock();
     try
     {
-      DatabaseEntry key = new DatabaseEntry();
-      DatabaseEntry data = new DatabaseEntry();
-
-      cursor = db.openCursor(null, null);
-
-      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
-
-      if (status != OperationStatus.SUCCESS)
+      dbCloseLock.readLock().lock();
+      try
       {
-        /* database is empty */
-        return null;
-      }
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
 
-      str = decodeUTF8(key.getData());
-      cn = new ChangeNumber(str);
-      if (ReplicationDB.isaCounter(cn))
-      {
-        // First record is a counter record .. go next
-        status = cursor.getNext(key, data, LockMode.DEFAULT);
+        cursor = db.openCursor(null, null);
+
+        OperationStatus status = cursor.getFirst(key, data,
+            LockMode.DEFAULT);
+
         if (status != OperationStatus.SUCCESS)
         {
-          // DB contains only a counter record
+          /* database is empty */
           return null;
         }
-        else
+
+        str = decodeUTF8(key.getData());
+        cn = new ChangeNumber(str);
+        if (ReplicationDB.isaCounter(cn))
         {
-          cn = new ChangeNumber(decodeUTF8(key.getData()));
+          // First record is a counter record .. go next
+          status = cursor.getNext(key, data, LockMode.DEFAULT);
+          if (status != OperationStatus.SUCCESS)
+          {
+            // DB contains only a counter record
+            return null;
+          }
+          else
+          {
+            cn = new ChangeNumber(decodeUTF8(key.getData()));
+          }
         }
       }
+      finally
+      {
+        closeLockedCursor(cursor);
+      }
     }
     catch (DatabaseException e)
     {
       /* database is faulty */
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       cn = null;
     }
-    finally
-    {
-      closeLockedCursor(cursor);
-    }
     return cn;
   }
 
@@ -420,57 +370,56 @@
     Cursor cursor = null;
     ChangeNumber cn = null;
 
-    dbCloseLock.readLock().lock();
     try
     {
-      DatabaseEntry key = new DatabaseEntry();
-      DatabaseEntry data = new DatabaseEntry();
-
-      cursor = db.openCursor(null, null);
-
-      OperationStatus status = cursor.getLast(key, data,
-          LockMode.DEFAULT);
-
-      if (status != OperationStatus.SUCCESS)
+      dbCloseLock.readLock().lock();
+      try
       {
-        /* database is empty */
-        return null;
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+
+        cursor = db.openCursor(null, null);
+
+        OperationStatus status = cursor.getLast(key, data,
+            LockMode.DEFAULT);
+
+        if (status != OperationStatus.SUCCESS)
+        {
+          /* database is empty */
+          return null;
+        }
+
+        String str = decodeUTF8(key.getData());
+        cn = new ChangeNumber(str);
+        if (ReplicationDB.isaCounter(cn))
+        {
+          if (cursor.getPrev(key, data, LockMode.DEFAULT)
+              != OperationStatus.SUCCESS)
+          {
+            /*
+             * database only contain a counter record - don't know how much it
+             * can be possible but ...
+             */
+            cn = null;
+          }
+          else
+          {
+            str = decodeUTF8(key.getData());
+            cn = new ChangeNumber(str);
+            // There can't be 2 counter record next to each other
+          }
+        }
       }
-
-      String str = decodeUTF8(key.getData());
-      cn = new ChangeNumber(str);
-      if (ReplicationDB.isaCounter(cn))
+      finally
       {
-        if (cursor.getPrev(key, data,
-            LockMode.DEFAULT) != OperationStatus.SUCCESS)
-        {
-          /*
-           * database only contain a counter record - don't know how much it can
-           * be possible but ...
-           */
-          cn = null;
-        }
-        else
-        {
-          str = decodeUTF8(key.getData());
-          cn= new ChangeNumber(str);
-          // There can't be 2 counter record next to each other
-        }
+        closeLockedCursor(cursor);
       }
     }
     catch (DatabaseException e)
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       cn = null;
     }
-    finally
-    {
-      closeLockedCursor(cursor);
-    }
 
     return cn;
   }
@@ -496,85 +445,80 @@
     DatabaseEntry key = new ReplicationKey(changeNumber);
     DatabaseEntry data = new DatabaseEntry();
 
-    Transaction txn = null;
-
-    dbCloseLock.readLock().lock();
     try
     {
-      cursor = db.openCursor(txn, null);
-      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
-              OperationStatus.SUCCESS)
+      dbCloseLock.readLock().lock();
+      try
       {
-        // We can move close to the changeNumber.
-        // Let's move to the previous change.
-        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
-                OperationStatus.SUCCESS)
+        cursor = db.openCursor(null, null);
+        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
+            == OperationStatus.SUCCESS)
         {
-          String str = decodeUTF8(key.getData());
-          cn = new ChangeNumber(str);
-          if (ReplicationDB.isaCounter(cn))
+          // We can move close to the changeNumber.
+          // Let's move to the previous change.
+          if (cursor.getPrev(key, data, LockMode.DEFAULT)
+              == OperationStatus.SUCCESS)
           {
-            if (cursor.getPrev(key, data,
-                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
+            String str = decodeUTF8(key.getData());
+            cn = new ChangeNumber(str);
+            if (ReplicationDB.isaCounter(cn))
             {
-              // database starts with a counter record.
-              cn = null;
+              if (cursor.getPrev(key, data, LockMode.DEFAULT)
+                  != OperationStatus.SUCCESS)
+              {
+                // database starts with a counter record.
+                cn = null;
+              }
+              else
+              {
+                str = decodeUTF8(key.getData());
+                cn = new ChangeNumber(str);
+                // There can't be 2 counter record next to each other
+              }
             }
-            else
+          }
+          // else, there was no change previous to our changeNumber.
+        }
+        else
+        {
+          // We could not move the cursor past to the changeNumber
+          // Check if the last change is older than changeNumber
+          if (cursor.getLast(key, data, LockMode.DEFAULT)
+              == OperationStatus.SUCCESS)
+          {
+            String str = decodeUTF8(key.getData());
+            cn = new ChangeNumber(str);
+            if (ReplicationDB.isaCounter(cn))
             {
-              str = decodeUTF8(key.getData());
-              cn= new ChangeNumber(str);
-              // There can't be 2 counter record next to each other
+              if (cursor.getPrev(key, data, LockMode.DEFAULT)
+                  != OperationStatus.SUCCESS)
+              {
+                /*
+                 * database only contain a counter record, should not be
+                 * possible, but Ok, let's just say no change Number
+                 */
+                cn = null;
+              }
+              else
+              {
+                str = decodeUTF8(key.getData());
+                cn = new ChangeNumber(str);
+                // There can't be 2 counter record next to each other
+              }
             }
           }
         }
-        // else, there was no change previous to our changeNumber.
       }
-      else
+      finally
       {
-        // We could not move the cursor past to the changeNumber
-        // Check if the last change is older than changeNumber
-        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
-                OperationStatus.SUCCESS)
-        {
-          String str = decodeUTF8(key.getData());
-          cn = new ChangeNumber(str);
-          if (ReplicationDB.isaCounter(cn))
-          {
-            if (cursor.getPrev(key, data,
-              LockMode.DEFAULT) != OperationStatus.SUCCESS)
-            {
-              /*
-               * database only contain a counter record, should not be
-               * possible, but Ok, let's just say no change Number
-               */
-              cn = null;
-            }
-            else
-            {
-              str = decodeUTF8(key.getData());
-              cn= new ChangeNumber(str);
-              // There can't be 2 counter record next to each other
-            }
-          }
-        }
+        closeLockedCursor(cursor);
       }
     }
     catch (DatabaseException e)
     {
-      /* database is faulty */
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      // TODO: Verify if shutting down replication is the right thing to do
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       cn = null;
     }
-    finally
-    {
-      closeLockedCursor(cursor);
-    }
     return cn;
   }
 
@@ -669,14 +613,7 @@
       catch (Exception e)
       {
         // Unlocking is required before throwing any exception
-        try
-        {
-          closeLockedCursor(localCursor);
-        }
-        catch (Exception ignore)
-        {
-          // Ignore.
-        }
+        closeLockedCursor(localCursor);
         throw e;
       }
     }
@@ -703,14 +640,7 @@
       }
       catch (Exception e)
       {
-        try
-        {
-          closeLockedCursor(localCursor);
-        }
-        catch (Exception ignore)
-        {
-          // Ignore.
-        }
+        closeLockedCursor(localCursor);
 
         if (localTxn != null)
         {
@@ -741,41 +671,19 @@
         isClosed = true;
       }
 
-      boolean closeHasFailed = false;
-
-      try
-      {
-        closeLockedCursor(cursor);
-      }
-      catch (DatabaseException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        closeHasFailed = true;
-      }
-
+      closeLockedCursor(cursor);
       if (txn != null)
       {
         try
         {
-          txn.commit();
+          // No need for durability when purging.
+          txn.commit(Durability.COMMIT_NO_SYNC);
         }
         catch (DatabaseException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          closeHasFailed = true;
+          replicationServer.handleUnexpectedDatabaseException(e);
         }
       }
-
-      if (closeHasFailed)
-      {
-        replicationServer.shutdown();
-      }
     }
 
     /**
@@ -796,26 +704,7 @@
         isClosed = true;
       }
 
-      boolean closeHasFailed = false;
-
-      try
-      {
-        closeLockedCursor(cursor);
-      }
-      catch (LockConflictException e)
-      {
-        // The DB documentation states that a DeadlockException
-        // on the close method of a cursor that is aborting should
-        // be ignored.
-      }
-      catch (DatabaseException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        closeHasFailed = true;
-      }
+      closeLockedCursor(cursor);
 
       if (txn != null)
       {
@@ -825,18 +714,9 @@
         }
         catch (DatabaseException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          closeHasFailed = true;
+          replicationServer.handleUnexpectedDatabaseException(e);
         }
       }
-
-      if (closeHasFailed)
-      {
-        replicationServer.shutdown();
-      }
     }
 
     /**
@@ -972,143 +852,140 @@
     int distToCounterRecord1 = 0;
     int distBackToCounterRecord2 = 0;
     int count=0;
-    Cursor cursor = null;
-    Transaction txn = null;
     OperationStatus status;
     try
     {
-      ChangeNumber cn ;
-
-      if ((start==null)&&(stop==null))
-        return (int)db.count();
-
-      // Step 1 : from the start point, traverse db to the next counter record
-      // or to the stop point.
-      DatabaseEntry key = new DatabaseEntry();
-      DatabaseEntry data = new DatabaseEntry();
-      cursor = db.openCursor(txn, null);
-      if (start != null)
+      Cursor cursor = null;
+      try
       {
-        key = new ReplicationKey(start);
-        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
-        if (status == OperationStatus.NOTFOUND)
-          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
-      }
-      else
-      {
-        status = cursor.getNext(key, data, LockMode.DEFAULT);
-      }
+        ChangeNumber cn ;
 
-      while (status == OperationStatus.SUCCESS)
-      {
-        // test whether the record is a regular change or a counter
-        String csnString = decodeUTF8(key.getData());
-        cn = new ChangeNumber(csnString);
-        if (cn.getServerId() != 0)
+        if ((start==null)&&(stop==null))
+          return (int)db.count();
+
+        // Step 1 : from the start point, traverse db to the next counter record
+        // or to the stop point.
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+        cursor = db.openCursor(null, null);
+        if (start != null)
         {
-          // reached a regular change record
-          // test whether we reached the 'stop' target
-          if (!cn.newer(stop))
-          {
-            // let's loop
-            distToCounterRecord1++;
-            status = cursor.getNext(key, data, LockMode.DEFAULT);
-          }
-          else
-          {
-            // reached the end
-            break;
-          }
+          key = new ReplicationKey(start);
+          status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+          if (status == OperationStatus.NOTFOUND)
+            status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
         }
         else
         {
-          // counter record
-          counterRecord1 = decodeCounterValue(data.getData());
-          break;
+          status = cursor.getNext(key, data, LockMode.DEFAULT);
         }
-      }
-      cursor.close();
 
-      // cases
-      //
-      if (counterRecord1==0)
-        return distToCounterRecord1;
+        while (status == OperationStatus.SUCCESS)
+        {
+          // test whether the record is a regular change or a counter
+          String csnString = decodeUTF8(key.getData());
+          cn = new ChangeNumber(csnString);
+          if (cn.getServerId() != 0)
+          {
+            // reached a regular change record
+            // test whether we reached the 'stop' target
+            if (!cn.newer(stop))
+            {
+              // let's loop
+              distToCounterRecord1++;
+              status = cursor.getNext(key, data, LockMode.DEFAULT);
+            }
+            else
+            {
+              // reached the end
+              break;
+            }
+          }
+          else
+          {
+            // counter record
+            counterRecord1 = decodeCounterValue(data.getData());
+            break;
+          }
+        }
+        cursor.close();
 
-      // Step 2 : from the stop point, traverse db to the next counter record
-      // or to the start point.
-      txn = null;
-      data = new DatabaseEntry();
-      key = new ReplicationKey(stop);
-      cursor = db.openCursor(txn, null);
-      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
-      if (status == OperationStatus.SUCCESS)
-      {
-        cn = new ChangeNumber(decodeUTF8(key.getData()));
-      }
-      else
-      {
-        key = new DatabaseEntry();
+        // cases
+        //
+        if (counterRecord1==0)
+          return distToCounterRecord1;
+
+        // Step 2 : from the stop point, traverse db to the next counter record
+        // or to the start point.
         data = new DatabaseEntry();
-        status = cursor.getLast(key, data, LockMode.DEFAULT);
-        if (status != OperationStatus.SUCCESS)
+        key = new ReplicationKey(stop);
+        cursor = db.openCursor(null, null);
+        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+        if (status == OperationStatus.SUCCESS)
         {
-          /* database is empty */
-          return 0;
+          cn = new ChangeNumber(decodeUTF8(key.getData()));
         }
-      }
-      while (status == OperationStatus.SUCCESS)
-      {
-        cn = new ChangeNumber(decodeUTF8(key.getData()));
-        if (!ReplicationDB.isaCounter(cn))
+        else
         {
-          // regular change record
-          if (!cn.older(start))
+          key = new DatabaseEntry();
+          data = new DatabaseEntry();
+          status = cursor.getLast(key, data, LockMode.DEFAULT);
+          if (status != OperationStatus.SUCCESS)
           {
-            distBackToCounterRecord2++;
-            status = cursor.getPrev(key, data, LockMode.DEFAULT);
+            /* database is empty */
+            return 0;
+          }
+        }
+        while (status == OperationStatus.SUCCESS)
+        {
+          cn = new ChangeNumber(decodeUTF8(key.getData()));
+          if (!ReplicationDB.isaCounter(cn))
+          {
+            // regular change record
+            if (!cn.older(start))
+            {
+              distBackToCounterRecord2++;
+              status = cursor.getPrev(key, data, LockMode.DEFAULT);
+            }
+            else
+              break;
           }
           else
+          {
+            // counter record
+            counterRecord2 = decodeCounterValue(data.getData());
             break;
+          }
         }
-        else
+        cursor.close();
+
+        // Step 3 : Now consolidates the result
+        if (counterRecord1!=0)
         {
-          // counter record
-          counterRecord2 = decodeCounterValue(data.getData());
-          break;
+          if (counterRecord1 == counterRecord2)
+          {
+            // only one cp between from and to - no need to use it
+            count = distToCounterRecord1 + distBackToCounterRecord2;
+          }
+          else
+          {
+            // 2 cp between from and to
+            count = distToCounterRecord1 + (counterRecord2-counterRecord1)
+            + distBackToCounterRecord2;
+          }
         }
       }
-      cursor.close();
-
-      // Step 3 : Now consolidates the result
-      if (counterRecord1!=0)
+      finally
       {
-        if (counterRecord1 == counterRecord2)
+        if (cursor != null)
         {
-          // only one cp between from and to - no need to use it
-          count = distToCounterRecord1 + distBackToCounterRecord2;
-        }
-        else
-        {
-          // 2 cp between from and to
-          count = distToCounterRecord1 + (counterRecord2-counterRecord1)
-            + distBackToCounterRecord2;
+          cursor.close();
         }
       }
     }
-    finally
+    catch (DatabaseException e)
     {
-      if (cursor != null)
-        cursor.close();
-      if (txn != null)
-      {
-        try
-        {
-          txn.abort();
-        } catch (DatabaseException e1)
-        {
-          // can't do much more. The ReplicationServer is shutting down.
-        }
-      }
+      replicationServer.handleUnexpectedDatabaseException(e);
     }
     return count;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java
index 52022f9..867f052 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2010 ForgeRock AS.
+ *      Portions Copyright 2010-2011 ForgeRock AS.
  */
 package org.opends.server.replication.server;
 
@@ -47,16 +47,20 @@
    * Creates a new ReplicationData object from an UpdateMsg.
    *
    * @param change the UpdateMsg used to create the ReplicationData.
-   *
-   * @throws UnsupportedEncodingException When the encoding of the message
-   *         failed because the UTF-8 encoding is not supported.
    */
   public ReplicationData(UpdateMsg change)
-         throws UnsupportedEncodingException
   {
     // Always keep messages in the replication DB with the current protocol
     // version
-    this.setData(change.getBytes());
+    try
+    {
+      this.setData(change.getBytes());
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      // This should not happen - UTF-8 is always available.
+      throw new RuntimeException(e);
+    }
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index cb34f92..7c28c16 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -40,16 +40,8 @@
 import java.io.File;
 import java.io.UnsupportedEncodingException;
 
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
+
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -92,10 +84,24 @@
      */
     envConfig.setAllowCreate(true);
     envConfig.setTransactional(true);
-    envConfig.setConfigParam("je.cleaner.expunge", "true");
     envConfig.setConfigParam("je.cleaner.threads", "2");
     envConfig.setConfigParam("je.checkpointer.highPriority", "true");
 
+    // If the JVM is reasonably large then we can safely default to
+    // bigger read buffers. This will result in more scalable checkpointer
+    // and cleaner performance.
+    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
+    {
+      envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", String
+          .valueOf(2 * 1024 * 1024));
+
+      envConfig.setConfigParam("je.log.iteratorReadSize", String
+          .valueOf(2 * 1024 * 1024));
+
+      envConfig.setConfigParam("je.log.faultReadSize", String
+          .valueOf(4 * 1024));
+    }
+
     // Tests have shown that since the parsing of the Replication log is always
     // done sequentially, it is not necessary to use a large DB cache.
     // Use 5M so that the replication can be used with 64M total for the JVM.
@@ -103,9 +109,14 @@
 
     // Since records are always added at the end of the Replication log and
     // deleted at the beginning of the Replication log, this should never
-    // cause any deadlock. It is therefore safe to increase the TXN timeout
-    // to 10 seconds.
-    envConfig.setTxnTimeout(10, TimeUnit.SECONDS);
+    // cause any deadlock.
+    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
+    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
+
+    // Since replication provides durability, we can reduce the DB durability
+    // level so that we are immune to application / JVM crashes.
+    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
+
     dbEnvironment = new Environment(new File(path), envConfig);
 
     /*
@@ -120,7 +131,6 @@
 
     stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
     start();
-
   }
 
   /**
@@ -316,7 +326,7 @@
               TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
                 " serverId/Domain=<"+stringId+">");
             stateDb.put(txn, key, data);
-            txn.commitWriteNoSync();
+            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
           } catch (DatabaseException dbe)
           {
             // Abort the txn and propagate the Exception to the caller
@@ -347,7 +357,7 @@
                   "Created in the state Db record Tag/Domain/GenId key=" +
                   stringId + " value=" + dataStringId);
             stateDb.put(txn, key, data);
-            txn.commitWriteNoSync();
+            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
           } catch (DatabaseException dbe)
           {
             // Abort the txn and propagate the Exception to the caller
@@ -432,7 +442,7 @@
           try
           {
             stateDb.delete(txn, key);
-            txn.commitWriteNoSync();
+            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
             if (debugEnabled())
               TRACER.debugInfo(
                 "In " + this.replicationServer.getMonitorInstanceName() +
@@ -495,7 +505,7 @@
           try {
             data.setData(byteId);
             stateDb.delete(txn, key);
-            txn.commitWriteNoSync();
+            txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
             if (debugEnabled())
               TRACER.debugInfo(
                   " In " + this.replicationServer.getMonitorInstanceName() +
@@ -532,7 +542,7 @@
       {
         txn = dbEnvironment.beginTransaction(null, null);
         dbEnvironment.truncateDatabase(txn, databaseName, false);
-        txn.commitWriteNoSync();
+        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
         txn = null;
       }
       catch (DatabaseException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index a21b51b..5c96d23 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -32,6 +32,7 @@
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.ServerConstants.EOL;
 import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.File;
 import java.io.IOException;
@@ -992,7 +993,7 @@
    * @return  The time after which changes must be deleted from the
    *          persistent storage (in milliseconds).
    */
-  long getTrimage()
+  long getTrimAge()
   {
     return purgeDelay * 1000;
   }
@@ -2002,4 +2003,24 @@
     }
   }
 
+
+
+  /**
+   * Shuts down replication when an unexpected database exception occurs. Note
+   * that we do not expect lock timeouts or txn timeouts because the replication
+   * databases are deadlock free, thus all operations should complete
+   * eventually.
+   *
+   * @param e
+   *          The unexpected database exception.
+   */
+  void handleUnexpectedDatabaseException(DatabaseException e)
+  {
+    MessageBuilder mb = new MessageBuilder();
+    mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+    mb.append(stackTraceToSingleLineString(e));
+    logError(mb.toMessage());
+    shutdown();
+  }
+
 }

--
Gitblit v1.10.0