From 407101fb21106bb8697aa771826638a41b968f3a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Dec 2013 16:30:13 +0000
Subject: [PATCH] OPENDJ-1172 Deadlock between replication threads during shutdown
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 63 +++++++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java | 74 ++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 26 ----
opendj-sdk/opends/src/messages/messages/replication.properties | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java | 81 ++++---------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 7
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 73 ++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 28 +---
8 files changed, 184 insertions(+), 174 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 5d66ce7..09ebfc1 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -77,13 +77,13 @@
%s : %s
SEVERE_ERR_EXCEPTION_DECODING_OPERATION_25=Error trying to replay %s, \
operation could not be decoded :
-FATAL_ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR_26=Error Trying to use the \
- underlying database. The Replication Server is going to shut down
+FATAL_ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR_26=Error trying to use the \
+ underlying database. The Replication Server is going to shut down: %s
SEVERE_ERR_ERROR_CLOSING_CHANGELOG_ENV_28=Error closing the Replication Server \
database : %s
SEVERE_ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH_29=Error during the Replication \
Server database trimming or flush process. The Changelog service is going to \
- shutdown
+ shutdown: %s
SEVERE_ERR_WRITER_UNEXPECTED_EXCEPTION_32=An unexpected error happened \
handling connection with %s. This connection is going to be closed
SEVERE_ERR_RS_ERROR_SENDING_ACK_33=In replication server %s: an unexpected error \
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index e8a9410..9121ec8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -429,11 +429,8 @@
* job properly anymore and needs to close all its connections and
* shutdown itself.
*/
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
+ logError(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR
+ .get(stackTraceToSingleLineString(e)));
localReplicationServer.shutdown();
return false;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
index cbdd813..1553f69 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -67,7 +67,7 @@
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
* @param dbenv The Db environment to use to create the db.
- * @throws ChangelogException If a database problem happened.
+ * @throws ChangelogException If a database problem happened
*/
public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException
{
@@ -82,8 +82,11 @@
*
* @param record
* the provided {@link ChangeNumberIndexRecord} to be stored.
+ * @throws ChangelogException
+ * If a database problem happened
*/
public void addRecord(ChangeNumberIndexRecord record)
+ throws ChangelogException
{
try
{
@@ -110,40 +113,13 @@
}
finally
{
- abort(txn);
+ JEUtils.abort(txn);
dbCloseLock.readLock().unlock();
}
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
- }
- catch (ChangelogException e)
- {
- dbenv.shutdownOnException(e);
- }
- }
-
- /**
- * Aborts the current transaction. It has no effect if the transaction has
- * committed.
- *
- * @param txn
- * the transaction to abort
- */
- private static void abort(Transaction txn)
- {
- if (txn != null)
- {
- try
- {
- txn.abort();
- }
- catch (DatabaseException ignored)
- {
- // Ignore.
- TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
- }
+ throw new ChangelogException(e);
}
}
@@ -173,9 +149,9 @@
* Create a cursor that can be used to search or iterate on this DB.
*
* @param changeNumber The change number from which the cursor must start.
+ * @return The ReplServerDBCursor
* @throws ChangelogException If a database error prevented the cursor
* creation.
- * @return The ReplServerDBCursor.
*/
public DraftCNDBCursor openReadCursor(long changeNumber)
throws ChangelogException
@@ -187,9 +163,9 @@
* Create a cursor that can be used to delete some record from this
* ReplicationServer database.
*
+ * @return The ReplServerDBCursor
* @throws ChangelogException If a database error prevented the cursor
* creation.
- * @return The ReplServerDBCursor.
*/
public DraftCNDBCursor openDeleteCursor() throws ChangelogException
{
@@ -246,8 +222,7 @@
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
- return null;
+ throw new ChangelogException(e);
}
}
@@ -323,8 +298,7 @@
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
- return null;
+ throw new ChangelogException(e);
}
}
@@ -360,7 +334,7 @@
* @param startChangeNumber
* the change number from which the cursor must start.
* @throws ChangelogException
- * when the startChangeNumber does not exist.
+ * If a database problem happened
*/
private DraftCNDBCursor(long startChangeNumber) throws ChangelogException
{
@@ -469,13 +443,13 @@
catch (DatabaseException e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- DraftCNDB.abort(localTxn);
+ JEUtils.abort(localTxn);
throw new ChangelogException(e);
}
catch (ChangelogException e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- DraftCNDB.abort(localTxn);
+ JEUtils.abort(localTxn);
throw e;
}
finally
@@ -521,9 +495,9 @@
}
/**
- * Abort the Cursor after a Deadlock Exception.
- * This method catch and ignore the DeadlockException because
- * this must be done when aborting a cursor after a DeadlockException
+ * Abort the Cursor after a DatabaseException.
+ * This method catch and ignore the DatabaseException because
+ * this must be done when aborting a cursor after a DatabaseException
* (per the Cursor documentation).
* This should not be used in any other case.
*/
@@ -539,18 +513,7 @@
}
closeLockedCursor(cursor);
-
- if (txn != null)
- {
- try
- {
- txn.abort();
- }
- catch (DatabaseException e)
- {
- dbenv.shutdownOnException(e);
- }
- }
+ JEUtils.abort(txn);
}
/**
@@ -570,8 +533,10 @@
/**
* Go to the next record on the cursor.
+ *
* @return the next record on this cursor.
- * @throws ChangelogException a.
+ * @throws ChangelogException
+ * If a database problem happened
*/
public boolean next() throws ChangelogException
{
@@ -600,7 +565,8 @@
/**
* Delete the record at the current cursor position.
*
- * @throws ChangelogException In case of database problem.
+ * @throws ChangelogException
+ * If a database problem happened
*/
public void delete() throws ChangelogException
{
@@ -630,7 +596,8 @@
/**
* Clears this change DB from the changes it contains.
*
- * @throws ChangelogException Throws a DatabaseException when it occurs.
+ * @throws ChangelogException
+ * If a database problem happened
*/
public void clear() throws ChangelogException
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index f0bc953..166accf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -283,15 +282,15 @@
Thread.currentThread().interrupt();
}
}
- } catch (Exception end)
+ }
+ catch (Exception end)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(end));
- logError(mb.toMessage());
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(end)));
if (replicationServer != null)
+ {
replicationServer.shutdown();
+ }
break;
}
try {
@@ -309,13 +308,12 @@
}
} catch (Exception end)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(end));
- logError(mb.toMessage());
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(end)));
if (replicationServer != null)
+ {
replicationServer.shutdown();
+ }
break;
}
}
@@ -454,18 +452,12 @@
}
catch (ChangelogException e)
{
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- shutdown.set(true);
throw e;
}
catch (Exception e)
{
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- shutdown.set(true);
throw new ChangelogException(e);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index d9779e6..b85d132 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -31,7 +31,6 @@
import java.util.LinkedList;
import java.util.List;
-import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -270,7 +269,7 @@
* @return a new {@link DBCursor} that allows to browse the db managed by this
* ReplicaDB and starting at the position defined by a given CSN.
* @throws ChangelogException
- * if a database problem happened.
+ * if a database problem happened
*/
public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
throws ChangelogException
@@ -325,7 +324,16 @@
while (msgQueue.size() != 0)
{
- flush();
+ try
+ {
+ flush();
+ }
+ catch (ChangelogException e)
+ {
+ // We are already shutting down
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(e)));
+ }
}
db.shutdown();
@@ -372,25 +380,21 @@
}
catch (Exception end)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(end));
- logError(mb.toMessage());
-
- thread.initiateShutdown();
-
- if (replicationServer != null)
- {
- replicationServer.shutdown();
- }
+ stop(end);
break;
}
}
- // call flush a last time before exiting to make sure that
- // no change was forgotten in the msgQueue
- flush();
+ try
+ {
+ // call flush a last time before exiting to make sure that
+ // no change was forgotten in the msgQueue
+ flush();
+ }
+ catch (ChangelogException e)
+ {
+ stop(e);
+ }
}
finally
{
@@ -403,6 +407,19 @@
}
}
+ private void stop(Exception e)
+ {
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(e)));
+
+ thread.initiateShutdown();
+
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ }
+
/**
* Retrieves the latest trim date.
* @return the latest trim date.
@@ -489,10 +506,14 @@
/**
* Flush a number of updates from the memory list to the stable storage.
- * Flush is done by chunk sized to 500 messages, starting from the
- * beginning of the list.
+ * <p>
+ * Flush is done by chunk sized to 500 messages, starting from the beginning
+ * of the list.
+ *
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void flush()
+ public void flush() throws ChangelogException
{
int size;
int chunksize = Math.min(queueMaxSize, 500);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java
new file mode 100644
index 0000000..c8c831b
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java
@@ -0,0 +1,74 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.types.DebugLogLevel;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+/**
+ * Utility class for JE.
+ */
+public final class JEUtils
+{
+
+ /** The tracer object for the debug logger. */
+ private static final DebugTracer TRACER = getTracer();
+
+ private JEUtils()
+ {
+ // Utility class
+ }
+
+ /**
+ * Aborts the current transaction. It has no effect if the transaction has
+ * committed.
+ * <p>
+ * This method should only be used after an exception was caught and is about
+ * to be rethrown .
+ *
+ * @param txn
+ * the transaction to abort
+ */
+ public static void abort(Transaction txn)
+ {
+ if (txn != null)
+ {
+ try
+ {
+ txn.abort();
+ }
+ catch (DatabaseException ignored)
+ {
+ // Ignored because code is already throwing an exception
+ TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+ }
+ }
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 0fcb368..afe3adc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -119,7 +119,7 @@
* @param baseDN The baseDN of the replication domain.
* @param replicationServer The ReplicationServer that needs to be shutdown.
* @param dbenv The Db environment to use to create the db.
- * @throws ChangelogException If a database problem happened.
+ * @throws ChangelogException If a database problem happened
*/
public ReplicationDB(int serverId, DN baseDN,
ReplicationServer replicationServer, ReplicationDbEnv dbenv)
@@ -184,9 +184,12 @@
/**
* add a list of changes to the underlying db.
*
- * @param changes The list of changes to add to the underlying db.
+ * @param changes
+ * The list of changes to add to the underlying db.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void addEntries(List<UpdateMsg> changes)
+ public void addEntries(List<UpdateMsg> changes) throws ChangelogException
{
dbCloseLock.readLock().lock();
try
@@ -209,7 +212,7 @@
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
+ throw new ChangelogException(e);
}
finally
{
@@ -280,9 +283,9 @@
* @param startCSN
* The CSN from which the cursor must start.If null, start from the
* oldest CSN
- * @throws ChangelogException
- * When a problem occurs or the startCSN does not exist.
* @return The ReplServerDBCursor.
+ * @throws ChangelogException
+ * If a database problem happened
*/
public ReplServerDBCursor openReadCursor(CSN startCSN)
throws ChangelogException
@@ -320,8 +323,10 @@
* Read the oldest CSN present in the database.
*
* @return the oldest CSN in the DB, null if the DB is empty or closed
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public CSN readOldestCSN()
+ public CSN readOldestCSN() throws ChangelogException
{
dbCloseLock.readLock().lock();
@@ -362,8 +367,7 @@
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
- return null;
+ throw new ChangelogException(e);
}
finally
{
@@ -375,8 +379,10 @@
* Read the newest CSN present in the database.
*
* @return the newest CSN in the DB, null if the DB is empty or closed
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public CSN readNewestCSN()
+ public CSN readNewestCSN() throws ChangelogException
{
dbCloseLock.readLock().lock();
@@ -419,8 +425,7 @@
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
- return null;
+ throw new ChangelogException(e);
}
finally
{
@@ -435,8 +440,10 @@
* The CSN from which we start searching.
* @return the CSN right before the one passed as a parameter. Can return null
* if there is none.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public CSN getPreviousCSN(CSN csn)
+ public CSN getPreviousCSN(CSN csn) throws ChangelogException
{
if (csn == null)
{
@@ -479,7 +486,7 @@
}
catch (DatabaseException e)
{
- dbenv.shutdownOnException(e);
+ throw new ChangelogException(e);
}
finally
{
@@ -652,12 +659,12 @@
}
catch (ChangelogException e)
{
- abort(localTxn);
+ JEUtils.abort(localTxn);
throw e;
}
catch (Exception e)
{
- abort(localTxn);
+ JEUtils.abort(localTxn);
throw new ChangelogException(e);
}
finally
@@ -669,21 +676,6 @@
}
}
- private void abort(Transaction localTxn)
- {
- if (localTxn != null)
- {
- try
- {
- localTxn.abort();
- }
- catch (DatabaseException ignore)
- {
- // Ignore.
- }
- }
- }
-
/**
* Close the ReplicationServer Cursor.
*/
@@ -716,9 +708,9 @@
}
/**
- * Abort the Cursor after a Deadlock Exception.
- * This method catch and ignore the DeadlockException because
- * this must be done when aborting a cursor after a DeadlockException
+ * Abort the cursor after an Exception.
+ * This method catch and ignore the DatabaseException because
+ * this must be done when aborting a cursor after a DatabaseException
* (per the Cursor documentation).
* This should not be used in any other case.
*/
@@ -734,18 +726,7 @@
}
closeAndReleaseReadLock(cursor);
-
- if (txn != null)
- {
- try
- {
- txn.abort();
- }
- catch (DatabaseException e)
- {
- dbenv.shutdownOnException(e);
- }
- }
+ JEUtils.abort(txn);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index a4539a7..885af0a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -614,30 +614,8 @@
*/
void shutdownOnException(DatabaseException e)
{
- innerShutdownOnException(e);
- }
-
- /**
- * Shuts down replication when an unexpected changelog exception occurs. Note
- * that we do not expect lock timeouts or txn timeouts because the replication
- * databases are deadlock free, thus all operations should complete
- * eventually.
- *
- * @param e
- * The unexpected changelog exception.
- */
- void shutdownOnException(ChangelogException e)
- {
- innerShutdownOnException(e);
- }
-
- private void innerShutdownOnException(Exception e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(". ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
+ logError(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR
+ .get(stackTraceToSingleLineString(e)));
replicationServer.shutdown();
}
--
Gitblit v1.10.0