mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.30.2013 5bf287bc9f92c5b0893e1dade87453be153d07c1
OPENDJ-1172 Deadlock between replication threads during shutdown

Review of the approach: Matthew Swift


Problem is caused by code deep into method calls that calls ReplicationServer.shutdown().
Thread 1 holds a lock on MessageHandler.msgQueue, an exception happens during processing and it then it calls ReplicationServer.shutdown() which then goes and tries to grab JEReplicaDB.msgQueue.
Thread 2 holds a lock on JEReplicaDB.msgQueue and then tries to grab MessageHandler.msgQueue.

The proper fix is to let the exceptions bubble up to the Thread.run() method, releasing all locks in the process, and call ReplicationServer.shutdown() from there.


replication.properties
Added stack traces to error messages.

ReplicationServerDomain.java:
Consequence of the change to the error messages, removed the use of MessageBuilder.

JEUtils.java: ADDED
Factorized all the code closing JE Transactions.

DraftCNDB.java, JEChangeNumberIndexDB.java, ReplicationDB.java:
Let ChangelogExceptions propagate up.
Used JEUtils.abort().
Consequence of the change to the error messages, removed the use of MessageBuilder.

JEReplicaDB.java:
Handled ChangelogException bubbling up here.
Extracted stop(Exception) method.

ReplicationDbEnv.java
removed one shutdownOnException() method.
Inlined innerShutdownOnException().
Consequence of the change to the error messages, removed the use of MessageBuilder.
1 files added
7 files modified
350 ■■■■ changed files
opends/src/messages/messages/replication.properties 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java 81 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 28 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 55 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java 74 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 73 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 26 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -77,13 +77,13 @@
 %s : %s
SEVERE_ERR_EXCEPTION_DECODING_OPERATION_25=Error trying to replay %s, \
 operation could not be decoded :
FATAL_ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR_26=Error Trying to use the \
 underlying database. The Replication Server is going to shut down
FATAL_ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR_26=Error trying to use the \
 underlying database. The Replication Server is going to shut down: %s
SEVERE_ERR_ERROR_CLOSING_CHANGELOG_ENV_28=Error closing the Replication Server \
 database : %s
SEVERE_ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH_29=Error during the Replication \
 Server database trimming or flush process. The Changelog service is going to \
 shutdown
 shutdown: %s
SEVERE_ERR_WRITER_UNEXPECTED_EXCEPTION_32=An unexpected error happened \
 handling connection with %s.  This connection is going to be closed
SEVERE_ERR_RS_ERROR_SENDING_ACK_33=In replication server %s: an unexpected error \
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -429,11 +429,8 @@
       * job properly anymore and needs to close all its connections and
       * shutdown itself.
       */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(" ");
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      logError(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR
          .get(stackTraceToSingleLineString(e)));
      localReplicationServer.shutdown();
      return false;
    }
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -67,7 +67,7 @@
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
   * @param dbenv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened.
   * @throws ChangelogException If a database problem happened
   */
  public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException
  {
@@ -82,8 +82,11 @@
   *
   * @param record
   *          the provided {@link ChangeNumberIndexRecord} to be stored.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void addRecord(ChangeNumberIndexRecord record)
      throws ChangelogException
  {
    try
    {
@@ -110,40 +113,13 @@
      }
      finally
      {
        abort(txn);
        JEUtils.abort(txn);
        dbCloseLock.readLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
    }
    catch (ChangelogException e)
    {
      dbenv.shutdownOnException(e);
    }
  }
  /**
   * Aborts the current transaction. It has no effect if the transaction has
   * committed.
   *
   * @param txn
   *          the transaction to abort
   */
  private static void abort(Transaction txn)
  {
    if (txn != null)
    {
      try
      {
        txn.abort();
      }
      catch (DatabaseException ignored)
      {
        // Ignore.
        TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
      }
      throw new ChangelogException(e);
    }
  }
@@ -173,9 +149,9 @@
   * Create a cursor that can be used to search or iterate on this DB.
   *
   * @param changeNumber The change number from which the cursor must start.
   * @return The ReplServerDBCursor
   * @throws ChangelogException If a database error prevented the cursor
   *                           creation.
   * @return The ReplServerDBCursor.
   */
  public DraftCNDBCursor openReadCursor(long changeNumber)
      throws ChangelogException
@@ -187,9 +163,9 @@
   * Create a cursor that can be used to delete some record from this
   * ReplicationServer database.
   *
   * @return The ReplServerDBCursor
   * @throws ChangelogException If a database error prevented the cursor
   *                           creation.
   * @return The ReplServerDBCursor.
   */
  public DraftCNDBCursor openDeleteCursor() throws ChangelogException
  {
@@ -246,8 +222,7 @@
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
      return null;
      throw new ChangelogException(e);
    }
  }
@@ -323,8 +298,7 @@
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
      return null;
      throw new ChangelogException(e);
    }
  }
@@ -360,7 +334,7 @@
     * @param startChangeNumber
     *          the change number from which the cursor must start.
     * @throws ChangelogException
     *           when the startChangeNumber does not exist.
     *           If a database problem happened
     */
    private DraftCNDBCursor(long startChangeNumber) throws ChangelogException
    {
@@ -469,13 +443,13 @@
      catch (DatabaseException e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        DraftCNDB.abort(localTxn);
        JEUtils.abort(localTxn);
        throw new ChangelogException(e);
      }
      catch (ChangelogException e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        DraftCNDB.abort(localTxn);
        JEUtils.abort(localTxn);
        throw e;
      }
      finally
@@ -521,9 +495,9 @@
    }
    /**
     * Abort the Cursor after a Deadlock Exception.
     * This method catch and ignore the DeadlockException because
     * this must be done when aborting a cursor after a DeadlockException
     * Abort the Cursor after a DatabaseException.
     * This method catch and ignore the DatabaseException because
     * this must be done when aborting a cursor after a DatabaseException
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
@@ -539,18 +513,7 @@
      }
      closeLockedCursor(cursor);
      if (txn != null)
      {
        try
        {
          txn.abort();
        }
        catch (DatabaseException e)
        {
          dbenv.shutdownOnException(e);
        }
      }
      JEUtils.abort(txn);
    }
    /**
@@ -570,8 +533,10 @@
    /**
     * Go to the next record on the cursor.
     *
     * @return the next record on this cursor.
     * @throws ChangelogException a.
     * @throws ChangelogException
     *           If a database problem happened
     */
    public boolean next() throws ChangelogException
    {
@@ -600,7 +565,8 @@
    /**
     * Delete the record at the current cursor position.
     *
     * @throws ChangelogException In case of database problem.
     * @throws ChangelogException
     *           If a database problem happened
     */
    public void delete() throws ChangelogException
    {
@@ -630,7 +596,8 @@
  /**
   * Clears this change DB from the changes it contains.
   *
   * @throws ChangelogException Throws a DatabaseException when it occurs.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clear() throws ChangelogException
  {
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -283,15 +282,15 @@
            Thread.currentThread().interrupt();
          }
        }
      } catch (Exception end)
      }
      catch (Exception end)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(end));
        logError(mb.toMessage());
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(end)));
        if (replicationServer != null)
        {
          replicationServer.shutdown();
        }
        break;
      }
      try {
@@ -309,13 +308,12 @@
        }
      } catch (Exception end)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(end));
        logError(mb.toMessage());
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(end)));
        if (replicationServer != null)
        {
          replicationServer.shutdown();
        }
        break;
      }
    }
@@ -454,18 +452,12 @@
      }
      catch (ChangelogException e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        shutdown.set(true);
        throw e;
      }
      catch (Exception e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        shutdown.set(true);
        throw new ChangelogException(e);
      }
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -31,7 +31,6 @@
import java.util.LinkedList;
import java.util.List;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -270,7 +269,7 @@
   * @return a new {@link DBCursor} that allows to browse the db managed by this
   *         ReplicaDB and starting at the position defined by a given CSN.
   * @throws ChangelogException
   *           if a database problem happened.
   *           if a database problem happened
   */
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
      throws ChangelogException
@@ -325,8 +324,17 @@
    while (msgQueue.size() != 0)
    {
      try
      {
      flush();
    }
      catch (ChangelogException e)
      {
        // We are already shutting down
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(e)));
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -372,26 +380,22 @@
        }
        catch (Exception end)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
          mb.append(" ");
          mb.append(stackTraceToSingleLineString(end));
          logError(mb.toMessage());
          thread.initiateShutdown();
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
          stop(end);
          break;
        }
      }
      try
      {
      // call flush a last time before exiting to make sure that
      // no change was forgotten in the msgQueue
      flush();
    }
      catch (ChangelogException e)
      {
        stop(e);
      }
    }
    finally
    {
      thread.stopWork();
@@ -403,6 +407,19 @@
    }
  }
  private void stop(Exception e)
  {
    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
        .get(stackTraceToSingleLineString(e)));
    thread.initiateShutdown();
    if (replicationServer != null)
    {
      replicationServer.shutdown();
    }
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
@@ -489,10 +506,14 @@
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * Flush is done by chunk sized to 500 messages, starting from the
   * beginning of the list.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   *
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void flush()
  public void flush() throws ChangelogException
  {
    int size;
    int chunksize = Math.min(queueMaxSize, 500);
opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java
New file
@@ -0,0 +1,74 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.DebugLogLevel;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * Utility class for JE.
 */
public final class JEUtils
{
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  private JEUtils()
  {
    // Utility class
  }
  /**
   * Aborts the current transaction. It has no effect if the transaction has
   * committed.
   * <p>
   * This method should only be used after an exception was caught and is about
   * to be rethrown .
   *
   * @param txn
   *          the transaction to abort
   */
  public static void abort(Transaction txn)
  {
    if (txn != null)
    {
      try
      {
        txn.abort();
      }
      catch (DatabaseException ignored)
      {
        // Ignored because code is already throwing an exception
        TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -119,7 +119,7 @@
   * @param baseDN The baseDN of the replication domain.
   * @param replicationServer The ReplicationServer that needs to be shutdown.
   * @param dbenv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened.
   * @throws ChangelogException If a database problem happened
   */
  public ReplicationDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
@@ -184,9 +184,12 @@
  /**
   * add a list of changes to the underlying db.
   *
   * @param changes The list of changes to add to the underlying db.
   * @param changes
   *          The list of changes to add to the underlying db.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void addEntries(List<UpdateMsg> changes)
  public void addEntries(List<UpdateMsg> changes) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -209,7 +212,7 @@
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
      throw new ChangelogException(e);
    }
    finally
    {
@@ -280,9 +283,9 @@
   * @param startCSN
   *          The CSN from which the cursor must start.If null, start from the
   *          oldest CSN
   * @throws ChangelogException
   *           When a problem occurs or the startCSN does not exist.
   * @return The ReplServerDBCursor.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public ReplServerDBCursor openReadCursor(CSN startCSN)
      throws ChangelogException
@@ -320,8 +323,10 @@
   * Read the oldest CSN present in the database.
   *
   * @return the oldest CSN in the DB, null if the DB is empty or closed
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readOldestCSN()
  public CSN readOldestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -362,8 +367,7 @@
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
      return null;
      throw new ChangelogException(e);
    }
    finally
    {
@@ -375,8 +379,10 @@
   * Read the newest CSN present in the database.
   *
   * @return the newest CSN in the DB, null if the DB is empty or closed
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readNewestCSN()
  public CSN readNewestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -419,8 +425,7 @@
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
      return null;
      throw new ChangelogException(e);
    }
    finally
    {
@@ -435,8 +440,10 @@
   *          The CSN from which we start searching.
   * @return the CSN right before the one passed as a parameter. Can return null
   *         if there is none.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN getPreviousCSN(CSN csn)
  public CSN getPreviousCSN(CSN csn) throws ChangelogException
  {
    if (csn == null)
    {
@@ -479,7 +486,7 @@
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
      throw new ChangelogException(e);
    }
    finally
    {
@@ -652,12 +659,12 @@
      }
      catch (ChangelogException e)
      {
        abort(localTxn);
        JEUtils.abort(localTxn);
        throw e;
      }
      catch (Exception e)
      {
        abort(localTxn);
        JEUtils.abort(localTxn);
        throw new ChangelogException(e);
      }
      finally
@@ -669,21 +676,6 @@
      }
    }
    private void abort(Transaction localTxn)
    {
      if (localTxn != null)
      {
        try
        {
          localTxn.abort();
        }
        catch (DatabaseException ignore)
        {
          // Ignore.
        }
      }
    }
    /**
     * Close the ReplicationServer Cursor.
     */
@@ -716,9 +708,9 @@
    }
    /**
     * Abort the Cursor after a Deadlock Exception.
     * This method catch and ignore the DeadlockException because
     * this must be done when aborting a cursor after a DeadlockException
     * Abort the cursor after an Exception.
     * This method catch and ignore the DatabaseException because
     * this must be done when aborting a cursor after a DatabaseException
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
@@ -734,18 +726,7 @@
      }
      closeAndReleaseReadLock(cursor);
      if (txn != null)
      {
        try
        {
          txn.abort();
        }
        catch (DatabaseException e)
        {
          dbenv.shutdownOnException(e);
        }
      }
      JEUtils.abort(txn);
    }
    /**
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -614,30 +614,8 @@
   */
  void shutdownOnException(DatabaseException e)
  {
    innerShutdownOnException(e);
  }
  /**
   * Shuts down replication when an unexpected changelog exception occurs. Note
   * that we do not expect lock timeouts or txn timeouts because the replication
   * databases are deadlock free, thus all operations should complete
   * eventually.
   *
   * @param e
   *          The unexpected changelog exception.
   */
  void shutdownOnException(ChangelogException e)
  {
    innerShutdownOnException(e);
  }
  private void innerShutdownOnException(Exception e)
  {
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
    mb.append(".   ");
    mb.append(stackTraceToSingleLineString(e));
    logError(mb.toMessage());
    logError(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR
        .get(stackTraceToSingleLineString(e)));
    replicationServer.shutdown();
  }