mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
01.40.2008 7e6c6657bced35f4a3aba723c2add20923450ad6
The Replication Server thread that is reading changes from the database
to propagate them to the Directory Servers was sometimes leaving some
Database cursors open.

This was leading to Deadlock Exceptions that was causing shutdown of
the Replication Server

When the 2 Replication Servers used in this test were dead the
Replication was dead.

These changes fix the code so that we always release the cursors
and catch the Deadlock Exception in case we would meet transient
Deadlock even though we believe it's not possible with the current
code.
5 files modified
185 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 40 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 112 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,6 +51,7 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DeadlockException;
/**
 * This class is used for managing the replicationServer database for each
@@ -99,6 +100,9 @@
  final static int MSG_QUEUE_HIMARK = 5000;
  final static int MSG_QUEUE_LOWMARK = 4000;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
@@ -285,7 +289,10 @@
      }
    }
    return new ReplicationIterator(serverId, db, changeNumber);
    ReplicationIterator it =
      new ReplicationIterator(serverId, db, changeNumber);
    return it;
  }
  /**
@@ -397,17 +404,25 @@
      return;
    int size = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
        (short) 0, (short)0);
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    {
    /* the trim is done by group in order to save some CPU and IO bandwidth
     * start the transaction then do a bunch of remove then commit
     */
    ReplServerDBCursor cursor;
    cursor = db.openDeleteCursor();
    try {
      try
      {
      while ((size < 5000 ) &&  (!finished))
      {
        ChangeNumber changeNumber = cursor.nextChangeNumber();
@@ -428,17 +443,30 @@
        else
          finished = true;
      }
      cursor.close();
    } catch (DatabaseException e)
        done = true;
      }
      catch (DeadlockException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          shutdown = true;
          throw (e);
        }
      }
      catch (DatabaseException e)
    {
      // mark shutdown for this db so that we don't try again to
      // stop it from cursor.close() or methods called by cursor.close()
      shutdown = true;
      cursor.close();
        cursor.abort();
      throw (e);
    }
  }
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -42,6 +42,7 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.je.DeadlockException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -59,6 +60,9 @@
  private Short serverId;
  private DN baseDn;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
@@ -95,28 +99,49 @@
    try
    {
      int tries = 0;
      boolean done = false;
      // The database can return a Deadlock Exception if several threads are
      // accessing the database at the same time. This Exception is a
      // transient state, when it happens the transaction is aborted and
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
        try
        {
      txn = dbenv.beginTransaction();
      for (UpdateMessage change : changes)
      {
        DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
        DatabaseEntry data = new ReplicationData(change);
        try
        {
          db.put(txn, key, data);
        } catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
        }
      }
      txn.commitWriteNoSync();
      txn = null;
          done = true;
        }
        catch (DeadlockException e)
        {
          txn.abort();
          txn = null;
        }
      }
      if (!done)
      {
        // Could not write to the DB after DEADLOCK_RETRIES tries.
        // This ReplicationServer is not reliable and will be shutdown.
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        if (txn != null)
        {
           txn.abort();
        }
        replicationServer.shutdown();
      }
    }
    catch (DatabaseException e)
    {
@@ -332,6 +357,8 @@
    {
      cursor = db.openCursor(txn, null);
      try
      {
      if (startingChangeNumber != null)
      {
        key = new ReplicationKey(startingChangeNumber);
@@ -352,12 +379,19 @@
            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
             OperationStatus.SUCCESS)
           {
                cursor.close();
             cursor = db.openCursor(txn, null);
           }
          }
        }
      }
    }
      catch (Exception e)
      {
        cursor.close();
        throw (e);
      }
    }
    private ReplServerDBCursor() throws DatabaseException
    {
@@ -370,13 +404,15 @@
     */
    public void close()
    {
      if (cursor == null)
        return;
      try
      {
        if (cursor != null)
        {
        cursor.close();
        cursor = null;
      } catch (DatabaseException e)
        }
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
@@ -401,6 +437,52 @@
    }
    /**
     * Abort the Cursor after a Deadlock Exception.
     * This method catch and ignore the DeadlockException because
     * this must be done when aborting a cursor after a DeadlockException
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
    public void abort()
    {
      if (cursor == null)
        return;
      try
      {
        cursor.close();
        cursor = null;
      }
      catch (DeadlockException e1)
      {
        // The DB documentation states that a DeadlockException
        // on the close method of a cursor that is aborting should
        // be ignored.
      }
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
        }
      }
    }
    /**
     * Get the next ChangeNumber in the database from this Cursor.
     *
     * @return The next ChangeNumber in the database from this cursor.
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -22,16 +22,16 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import com.sleepycat.je.DatabaseException;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
/**
 * This class allows to iterate through the changes received from a given
 * LDAP Server Identifier.
@@ -43,6 +43,9 @@
  /**
   * Creates a new ReplicationIterator.
   * All created iterator must be released by the caller using the
   * releaseCursor() method.
   *
   * @param id the Identifier of the server on which the iterator applies.
   * @param db The db where the iterator must be created.
   * @param changeNumber The ChangeNumber after which the iterator must start.
@@ -56,7 +59,9 @@
  {
    cursor = db.openReadCursor(changeNumber);
    if (cursor == null)
    {
      throw new Exception("no new change");
    }
     if (this.next() == false)
     {
       cursor.close();
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -572,6 +572,9 @@
  /**
   * Creates and returns an iterator.
   * When the iterator is not used anymore, the caller MUST call the
   * ReplicationIterator.releaseCursor() method to free the ressources
   * and locks used by the ReplicationIterator.
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -857,16 +857,6 @@
    if (olderUpdateCN == null)
      return null;
    ReplicationIterator ri =
      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
    if (ri != null)
    {
      if (ri.next())
      {
        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
        return firstMissingChange.getTime();
      }
    }
    return olderUpdateCN.getTime();
  }
@@ -1081,10 +1071,17 @@
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ReplicationIterator iterator =
              replicationServerDomain.getChangelogIterator(serverId, lastCsn);
            if ((iterator != null) && (iterator.getChange() != null))
            if (iterator != null)
            {
              if (iterator.getChange() != null)
            {
              iteratorSortedSet.add(iterator);
            }
              else
              {
                iterator.releaseCursor();
              }
            }
          }
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          {