From 7e6c6657bced35f4a3aba723c2add20923450ad6 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Feb 2008 09:40:56 +0000
Subject: [PATCH] The Replication Server thread that is reading changes from the database to propagate them to the Directory Servers was sometimes leaving some Database cursors open.
---
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 158 ++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 120 insertions(+), 38 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index e003d16..63c5c4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -42,6 +42,7 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
+import com.sleepycat.je.DeadlockException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -59,6 +60,9 @@
private Short serverId;
private DN baseDn;
+ // The maximum number of retries in case of DatabaseDeadlock Exception.
+ private static final int DEADLOCK_RETRIES = 10;
+
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
@@ -95,28 +99,49 @@
try
{
- txn = dbenv.beginTransaction();
+ int tries = 0;
+ boolean done = false;
- for (UpdateMessage change : changes)
+ // The database can return a Deadlock Exception if several threads are
+ // accessing the database at the same time. This Exception is a
+ // transient state, when it happens the transaction is aborted and
+ // the operation is attempted again up to DEADLOCK_RETRIES times.
+ while ((tries++ < DEADLOCK_RETRIES) && (!done))
{
- DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
- DatabaseEntry data = new ReplicationData(change);
-
try
{
- db.put(txn, key, data);
- } catch (DatabaseException e)
+ txn = dbenv.beginTransaction();
+
+ for (UpdateMessage change : changes)
+ {
+ DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
+ DatabaseEntry data = new ReplicationData(change);
+ db.put(txn, key, data);
+ }
+
+ txn.commitWriteNoSync();
+ txn = null;
+ done = true;
+ }
+ catch (DeadlockException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ txn.abort();
+ txn = null;
}
}
-
- txn.commitWriteNoSync();
- txn = null;
+ if (!done)
+ {
+ // Could not write to the DB after DEADLOCK_RETRIES tries.
+ // This ReplicationServer is not reliable and will be shutdown.
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ logError(mb.toMessage());
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ replicationServer.shutdown();
+ }
}
catch (DatabaseException e)
{
@@ -332,31 +357,40 @@
{
cursor = db.openCursor(txn, null);
- if (startingChangeNumber != null)
+ try
{
- key = new ReplicationKey(startingChangeNumber);
- data = new DatabaseEntry();
-
- if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
+ if (startingChangeNumber != null)
{
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+ key = new ReplicationKey(startingChangeNumber);
+ data = new DatabaseEntry();
+
+ if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
+ if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+ OperationStatus.SUCCESS)
+ {
throw new Exception("ChangeNumber not available");
- }
- else
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
- {
- cursor = db.openCursor(txn, null);
- }
+ }
+ else
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+ OperationStatus.SUCCESS)
+ {
+ cursor.close();
+ cursor = db.openCursor(txn, null);
+ }
+ }
}
}
}
+ catch (Exception e)
+ {
+ cursor.close();
+ throw (e);
+ }
}
private ReplServerDBCursor() throws DatabaseException
@@ -370,13 +404,15 @@
*/
public void close()
{
- if (cursor == null)
- return;
try
{
- cursor.close();
- cursor = null;
- } catch (DatabaseException e)
+ if (cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+ }
+ catch (DatabaseException e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
@@ -401,6 +437,52 @@
}
/**
+ * Abort the Cursor after a Deadlock Exception.
+ * This method catch and ignore the DeadlockException because
+ * this must be done when aborting a cursor after a DeadlockException
+ * (per the Cursor documentation).
+ * This should not be used in any other case.
+ */
+ public void abort()
+ {
+ if (cursor == null)
+ return;
+ try
+ {
+ cursor.close();
+ cursor = null;
+ }
+ catch (DeadlockException e1)
+ {
+ // The DB documentation states that a DeadlockException
+ // on the close method of a cursor that is aborting should
+ // be ignored.
+ }
+ catch (DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ }
+ if (txn != null)
+ {
+ try
+ {
+ txn.abort();
+ } catch (DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ }
+ }
+ }
+
+ /**
* Get the next ChangeNumber in the database from this Cursor.
*
* @return The next ChangeNumber in the database from this cursor.
--
Gitblit v1.10.0