From 7e6c6657bced35f4a3aba723c2add20923450ad6 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Feb 2008 09:40:56 +0000
Subject: [PATCH] The Replication Server thread that is reading changes from the database to propagate them to the Directory Servers was sometimes leaving some Database cursors open.
---
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java | 11 +
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 21 +--
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 158 ++++++++++++++++++++++++-------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 3
opends/src/server/org/opends/server/replication/server/DbHandler.java | 86 +++++++++++-----
5 files changed, 197 insertions(+), 82 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index f3aabe9..f6c2c11 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,6 +51,7 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.DeadlockException;
/**
* This class is used for managing the replicationServer database for each
@@ -99,6 +100,9 @@
final static int MSG_QUEUE_HIMARK = 5000;
final static int MSG_QUEUE_LOWMARK = 4000;
+ // The maximum number of retries in case of DatabaseDeadlock Exception.
+ private static final int DEADLOCK_RETRIES = 10;
+
/**
*
* The trim age in milliseconds. Changes record in the change DB that
@@ -285,7 +289,10 @@
}
}
- return new ReplicationIterator(serverId, db, changeNumber);
+ ReplicationIterator it =
+ new ReplicationIterator(serverId, db, changeNumber);
+
+ return it;
}
/**
@@ -397,46 +404,67 @@
return;
int size = 0;
boolean finished = false;
+ boolean done = false;
ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
(short) 0, (short)0);
- /* the trim is done by group in order to save some CPU and IO bandwidth
- * start the transaction then do a bunch of remove then commit
- */
- ReplServerDBCursor cursor;
+ // In case of deadlock detection by the Database, this thread can
+ // by aborted by a DeadlockException. This is a transient error and
+ // the transaction should be attempted again.
+ // We will try DEADLOCK_RETRIES times before failing.
+ int tries = 0;
+ while ((tries++ < DEADLOCK_RETRIES) && (!done))
+ {
+ /* the trim is done by group in order to save some CPU and IO bandwidth
+ * start the transaction then do a bunch of remove then commit
+ */
+ ReplServerDBCursor cursor;
+ cursor = db.openDeleteCursor();
- cursor = db.openDeleteCursor();
-
- try {
- while ((size < 5000 ) && (!finished))
+ try
{
- ChangeNumber changeNumber = cursor.nextChangeNumber();
- if (changeNumber != null)
+ while ((size < 5000 ) && (!finished))
{
- if ((!changeNumber.equals(lastChange))
- && (changeNumber.older(trimDate)))
+ ChangeNumber changeNumber = cursor.nextChangeNumber();
+ if (changeNumber != null)
{
- size++;
- cursor.delete();
+ if ((!changeNumber.equals(lastChange))
+ && (changeNumber.older(trimDate)))
+ {
+ size++;
+ cursor.delete();
+ }
+ else
+ {
+ firstChange = changeNumber;
+ finished = true;
+ }
}
else
- {
- firstChange = changeNumber;
finished = true;
- }
}
- else
- finished = true;
+ cursor.close();
+ done = true;
}
-
- cursor.close();
- } catch (DatabaseException e)
- {
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
- shutdown = true;
- cursor.close();
- throw (e);
+ catch (DeadlockException e)
+ {
+ cursor.abort();
+ if (tries == DEADLOCK_RETRIES)
+ {
+ // could not handle the Deadlock after DEADLOCK_RETRIES tries.
+ // shutdown the ReplicationServer.
+ shutdown = true;
+ throw (e);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ // mark shutdown for this db so that we don't try again to
+ // stop it from cursor.close() or methods called by cursor.close()
+ shutdown = true;
+ cursor.abort();
+ throw (e);
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index e003d16..63c5c4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -42,6 +42,7 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
+import com.sleepycat.je.DeadlockException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -59,6 +60,9 @@
private Short serverId;
private DN baseDn;
+ // The maximum number of retries in case of DatabaseDeadlock Exception.
+ private static final int DEADLOCK_RETRIES = 10;
+
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
@@ -95,28 +99,49 @@
try
{
- txn = dbenv.beginTransaction();
+ int tries = 0;
+ boolean done = false;
- for (UpdateMessage change : changes)
+ // The database can return a Deadlock Exception if several threads are
+ // accessing the database at the same time. This Exception is a
+ // transient state, when it happens the transaction is aborted and
+ // the operation is attempted again up to DEADLOCK_RETRIES times.
+ while ((tries++ < DEADLOCK_RETRIES) && (!done))
{
- DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
- DatabaseEntry data = new ReplicationData(change);
-
try
{
- db.put(txn, key, data);
- } catch (DatabaseException e)
+ txn = dbenv.beginTransaction();
+
+ for (UpdateMessage change : changes)
+ {
+ DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
+ DatabaseEntry data = new ReplicationData(change);
+ db.put(txn, key, data);
+ }
+
+ txn.commitWriteNoSync();
+ txn = null;
+ done = true;
+ }
+ catch (DeadlockException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ txn.abort();
+ txn = null;
}
}
-
- txn.commitWriteNoSync();
- txn = null;
+ if (!done)
+ {
+ // Could not write to the DB after DEADLOCK_RETRIES tries.
+ // This ReplicationServer is not reliable and will be shutdown.
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ logError(mb.toMessage());
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ replicationServer.shutdown();
+ }
}
catch (DatabaseException e)
{
@@ -332,31 +357,40 @@
{
cursor = db.openCursor(txn, null);
- if (startingChangeNumber != null)
+ try
{
- key = new ReplicationKey(startingChangeNumber);
- data = new DatabaseEntry();
-
- if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
+ if (startingChangeNumber != null)
{
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+ key = new ReplicationKey(startingChangeNumber);
+ data = new DatabaseEntry();
+
+ if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
+ if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+ OperationStatus.SUCCESS)
+ {
throw new Exception("ChangeNumber not available");
- }
- else
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
- {
- cursor = db.openCursor(txn, null);
- }
+ }
+ else
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+ OperationStatus.SUCCESS)
+ {
+ cursor.close();
+ cursor = db.openCursor(txn, null);
+ }
+ }
}
}
}
+ catch (Exception e)
+ {
+ cursor.close();
+ throw (e);
+ }
}
private ReplServerDBCursor() throws DatabaseException
@@ -370,13 +404,15 @@
*/
public void close()
{
- if (cursor == null)
- return;
try
{
- cursor.close();
- cursor = null;
- } catch (DatabaseException e)
+ if (cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+ }
+ catch (DatabaseException e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
@@ -401,6 +437,52 @@
}
/**
+ * Abort the Cursor after a Deadlock Exception.
+ * This method catch and ignore the DeadlockException because
+ * this must be done when aborting a cursor after a DeadlockException
+ * (per the Cursor documentation).
+ * This should not be used in any other case.
+ */
+ public void abort()
+ {
+ if (cursor == null)
+ return;
+ try
+ {
+ cursor.close();
+ cursor = null;
+ }
+ catch (DeadlockException e1)
+ {
+ // The DB documentation states that a DeadlockException
+ // on the close method of a cursor that is aborting should
+ // be ignored.
+ }
+ catch (DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ }
+ if (txn != null)
+ {
+ try
+ {
+ txn.abort();
+ } catch (DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ }
+ }
+ }
+
+ /**
* Get the next ChangeNumber in the database from this Cursor.
*
* @return The next ChangeNumber in the database from this cursor.
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index a65f61c..6c078d4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -22,16 +22,16 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
-import com.sleepycat.je.DatabaseException;
-
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
+import com.sleepycat.je.DatabaseException;
+
/**
* This class allows to iterate through the changes received from a given
* LDAP Server Identifier.
@@ -43,6 +43,9 @@
/**
* Creates a new ReplicationIterator.
+ * All created iterator must be released by the caller using the
+ * releaseCursor() method.
+ *
* @param id the Identifier of the server on which the iterator applies.
* @param db The db where the iterator must be created.
* @param changeNumber The ChangeNumber after which the iterator must start.
@@ -56,7 +59,9 @@
{
cursor = db.openReadCursor(changeNumber);
if (cursor == null)
+ {
throw new Exception("no new change");
+ }
if (this.next() == false)
{
cursor.close();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4d9cf4c..89726d4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -572,6 +572,9 @@
/**
* Creates and returns an iterator.
+ * When the iterator is not used anymore, the caller MUST call the
+ * ReplicationIterator.releaseCursor() method to free the ressources
+ * and locks used by the ReplicationIterator.
*
* @param serverId Identifier of the server for which the iterator is created.
* @param changeNumber Starting point for the iterator.
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index d624741..3b85cb1 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -857,16 +857,6 @@
if (olderUpdateCN == null)
return null;
- ReplicationIterator ri =
- replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
- if (ri != null)
- {
- if (ri.next())
- {
- ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
- return firstMissingChange.getTime();
- }
- }
return olderUpdateCN.getTime();
}
@@ -1081,9 +1071,16 @@
ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
ReplicationIterator iterator =
replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- if ((iterator != null) && (iterator.getChange() != null))
+ if (iterator != null)
{
- iteratorSortedSet.add(iterator);
+ if (iterator.getChange() != null)
+ {
+ iteratorSortedSet.add(iterator);
+ }
+ else
+ {
+ iterator.releaseCursor();
+ }
}
}
while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
--
Gitblit v1.10.0