From 6b1e3bf06de1327d05b8cbefcd930e5974f556d3 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 04 Apr 2011 22:33:36 +0000
Subject: [PATCH] OpenDJ-107: Potential for leaking DB cursors in replication databases
---
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 422 +++++++++++++++++++++++++++-------------------------
1 files changed, 216 insertions(+), 206 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 7330fac..b512b7e 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -30,6 +30,8 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.util.StaticUtils.decodeUTF8;
+import static org.opends.server.util.StaticUtils.getBytes;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
@@ -37,8 +39,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
+
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.zip.DataFormatException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -144,12 +146,12 @@
// Initialize counter
this.counterCurrValue = 1;
cursor = db.openCursor(txn, null);
- status = cursor.getLast(key, data, LockMode.DEFAULT);
- while (status == OperationStatus.SUCCESS)
+ try
{
- try
+ status = cursor.getLast(key, data, LockMode.DEFAULT);
+ while (status == OperationStatus.SUCCESS)
{
- ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
+ ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
if (!ReplicationDB.isaCounter(cn))
{
status = cursor.getPrev(key, data, LockMode.DEFAULT);
@@ -158,38 +160,17 @@
else
{
// counter record
- counterCurrValue = decodeCounterValue(data.getData())+1;
+ counterCurrValue = decodeCounterValue(data.getData()) + 1;
counterTsLimit = cn.getTime();
break;
}
}
- catch (UnsupportedEncodingException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shuting down.
- }
- }
- replicationServer.shutdown();
- }
- catch (DataFormatException e)
- {
- // Should never happen
- }
+ counterCurrValue += distBackToCounterRecord;
}
- counterCurrValue += distBackToCounterRecord;
- cursor.close();
-
+ finally
+ {
+ cursor.close();
+ }
}
/**
@@ -377,55 +358,38 @@
String str = null;
ChangeNumber cn = null;
+ dbCloseLock.readLock().lock();
try
{
- dbCloseLock.readLock().lock();
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
cursor = db.openCursor(null, null);
- }
- catch (DatabaseException e1)
- {
- dbCloseLock.readLock().unlock();
- return null;
- }
- try
- {
- try
+
+ OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+
+ if (status != OperationStatus.SUCCESS)
{
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+ /* database is empty */
+ return null;
+ }
+
+ str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ // First record is a counter record .. go next
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
if (status != OperationStatus.SUCCESS)
{
- /* database is empty */
+ // DB contains only a counter record
return null;
}
- try
+ else
{
- str = new String(key.getData(), "UTF-8");
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- // First record is a counter record .. go next
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- // DB contains only a counter record
- return null;
- }
- else
- {
- cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
- }
- }
- } catch (UnsupportedEncodingException e)
- {
- // never happens
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
}
}
- finally
- {
- closeLockedCursor(cursor);
- }
}
catch (DatabaseException e)
{
@@ -437,11 +401,18 @@
replicationServer.shutdown();
cn = null;
}
+ finally
+ {
+ closeLockedCursor(cursor);
+ }
return cn;
}
+
+
/**
* Read the last Change from the database.
+ *
* @return the last ChangeNumber.
*/
public ChangeNumber readLastChange()
@@ -449,43 +420,36 @@
Cursor cursor = null;
ChangeNumber cn = null;
+ dbCloseLock.readLock().lock();
try
{
- dbCloseLock.readLock().lock();
- try
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
+ cursor = db.openCursor(null, null);
+
+ OperationStatus status = cursor.getLast(key, data,
+ LockMode.DEFAULT);
+
+ if (status != OperationStatus.SUCCESS)
{
- cursor = db.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- /* database is empty */
- return null;
- }
- try
- {
- String str = new String(key.getData(), "UTF-8");
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
- {
- /* database only contain a counter record - don't know
- * how much it can be possible but ... */
- cn = null;
- }
- }
- }
- catch (UnsupportedEncodingException e)
- {
- // never happens
- }
+ /* database is empty */
+ return null;
}
- finally
+
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
{
- closeLockedCursor(cursor);
+ if (cursor.getPrev(key, data,
+ LockMode.DEFAULT) != OperationStatus.SUCCESS)
+ {
+ /*
+ * database only contain a counter record - don't know how much it can
+ * be possible but ...
+ */
+ cn = null;
+ }
}
}
catch (DatabaseException e)
@@ -497,6 +461,11 @@
replicationServer.shutdown();
cn = null;
}
+ finally
+ {
+ closeLockedCursor(cursor);
+ }
+
return cn;
}
@@ -515,44 +484,56 @@
*/
public class ReplServerDBCursor
{
- private Cursor cursor = null;
-
- // The transaction that will protect the actions done with the cursor
+ // The transaction that will protect the actions done with the cursor
// Will be let null for a read cursor
// Will be set non null for a write cursor
- private Transaction txn = null;
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
+ private final Transaction txn;
+ private final Cursor cursor;
+ private final DatabaseEntry key;
+ private final DatabaseEntry data;
+
+ private boolean isClosed = false;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
* replicationServer db.
*
- * @param startingChangeNumber The ChangeNumber from which the cursor must
- * start.
- * @throws Exception When the startingChangeNumber does not exist.
+ * @param startingChangeNumber
+ * The ChangeNumber from which the cursor must start.
+ * @throws Exception
+ * When the startingChangeNumber does not exist.
*/
private ReplServerDBCursor(ChangeNumber startingChangeNumber)
- throws Exception
+ throws Exception
{
+ if (startingChangeNumber != null)
+ {
+ key = new ReplicationKey(startingChangeNumber);
+ }
+ else
+ {
+ key = new DatabaseEntry();
+ }
+ data = new DatabaseEntry();
+
+ txn = null;
+
+ // Take the lock. From now on, whatever error that happen in the life
+ // of this cursor should end by unlocking that lock. We must also
+ // unlock it when throwing an exception.
+ dbCloseLock.readLock().lock();
+
+ Cursor localCursor = null;
try
{
- // Take the lock. From now on, whatever error that happen in the life
- // of this cursor should end by unlocking that lock. We must also
- // unlock it when throwing an exception.
- dbCloseLock.readLock().lock();
-
- cursor = db.openCursor(txn, null);
+ localCursor = db.openCursor(txn, null);
if (startingChangeNumber != null)
{
- key = new ReplicationKey(startingChangeNumber);
- data = new DatabaseEntry();
-
- if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
+ if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
// We could not move the cursor to the expected startingChangeNumber
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+ if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
// We could not even move the cursor closed to it => failure
@@ -564,51 +545,75 @@
// Let's create a cursor from that point.
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+ if (localCursor.getPrev(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
- closeLockedCursor(cursor);
- dbCloseLock.readLock().lock();
- cursor = db.openCursor(txn, null);
+ localCursor.close();
+ localCursor = db.openCursor(txn, null);
}
}
}
}
+ cursor = localCursor;
}
catch (Exception e)
{
- // Unlocking is required before throwing any exception
- closeLockedCursor(cursor);
- throw (e);
+ // Unlocking is required before throwing any exception
+ try
+ {
+ closeLockedCursor(localCursor);
+ }
+ catch (Exception ignore)
+ {
+ // Ignore.
+ }
+ throw e;
}
}
- private ReplServerDBCursor() throws DatabaseException
+ private ReplServerDBCursor() throws Exception
{
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+
+ // We'll go on only if no close or no clear is running
+ dbCloseLock.readLock().lock();
+
+ Transaction localTxn = null;
+ Cursor localCursor = null;
try
{
- // We'll go on only if no close or no clear is running
- dbCloseLock.readLock().lock();
-
// Create the transaction that will protect whatever done with this
// write cursor.
- txn = dbenv.beginTransaction();
+ localTxn = dbenv.beginTransaction();
+ localCursor = db.openCursor(localTxn, null);
- cursor = db.openCursor(txn, null);
+ txn = localTxn;
+ cursor = localCursor;
}
- catch(DatabaseException e)
+ catch (Exception e)
{
- if (txn != null)
+ try
+ {
+ closeLockedCursor(localCursor);
+ }
+ catch (Exception ignore)
+ {
+ // Ignore.
+ }
+
+ if (localTxn != null)
{
try
{
- txn.abort();
+ localTxn.abort();
}
- catch (DatabaseException dbe)
- {}
+ catch (DatabaseException ignore)
+ {
+ // Ignore.
+ }
}
- closeLockedCursor(cursor);
- throw (e);
+ throw e;
}
}
@@ -617,10 +622,20 @@
*/
public void close()
{
+ synchronized (this)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+ }
+
+ boolean closeHasFailed = false;
+
try
{
closeLockedCursor(cursor);
- cursor = null;
}
catch (DatabaseException e)
{
@@ -628,22 +643,29 @@
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
+
if (txn != null)
{
try
{
txn.commit();
- } catch (DatabaseException e)
+ }
+ catch (DatabaseException e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
}
+
+ if (closeHasFailed)
+ {
+ replicationServer.shutdown();
+ }
}
/**
@@ -655,14 +677,22 @@
*/
public void abort()
{
- if (cursor == null)
- return;
+ synchronized (this)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+ }
+
+ boolean closeHasFailed = false;
+
try
{
closeLockedCursor(cursor);
- cursor = null;
}
- catch (LockConflictException e1)
+ catch (LockConflictException e)
{
// The DB documentation states that a DeadlockException
// on the close method of a cursor that is aborting should
@@ -674,22 +704,29 @@
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
+
if (txn != null)
{
try
{
txn.abort();
- } catch (DatabaseException e)
+ }
+ catch (DatabaseException e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
}
+
+ if (closeHasFailed)
+ {
+ replicationServer.shutdown();
+ }
}
/**
@@ -706,15 +743,8 @@
{
return null;
}
- try
- {
- String csnString = new String(key.getData(), "UTF-8");
- return new ChangeNumber(csnString);
- } catch (UnsupportedEncodingException e)
- {
- // can't happen
- return null;
- }
+ String csnString = decodeUTF8(key.getData());
+ return new ChangeNumber(csnString);
}
/**
@@ -738,26 +768,29 @@
{
return null;
}
+
try
{
- ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
- if(ReplicationDB.isaCounter(cn))
+ ChangeNumber cn = new ChangeNumber(
+ decodeUTF8(key.getData()));
+ if (ReplicationDB.isaCounter(cn))
{
// counter record
continue;
}
- currentChange = ReplicationData.generateChange(data.getData());
- } catch (Exception e) {
+ currentChange = ReplicationData.generateChange(data
+ .getData());
+ }
+ catch (Exception e)
+ {
/*
* An error happening trying to convert the data from the
- * replicationServer database to an Update Message.
- * This can only happen if the database is corrupted.
- * There is not much more that we can do at this point except trying
- * to continue with the next record.
- * In such case, it is therefore possible that we miss some changes.
- * TODO. log an error message.
- * TODO : REPAIR : Such problem should be handled by the
- * repair functionality.
+ * replicationServer database to an Update Message. This can only
+ * happen if the database is corrupted. There is not much more that we
+ * can do at this point except trying to continue with the next
+ * record. In such case, it is therefore possible that we miss some
+ * changes. TODO. log an error message. TODO : REPAIR : Such problem
+ * should be handled by the repair functionality.
*/
}
}
@@ -859,7 +892,7 @@
while (status == OperationStatus.SUCCESS)
{
// test whether the record is a regular change or a counter
- String csnString = new String(key.getData(), "UTF-8");
+ String csnString = decodeUTF8(key.getData());
cn = new ChangeNumber(csnString);
if (cn.getServerId() != 0)
{
@@ -900,7 +933,7 @@
status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
}
else
{
@@ -915,7 +948,7 @@
}
while (status == OperationStatus.SUCCESS)
{
- cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
if (!ReplicationDB.isaCounter(cn))
{
// regular change record
@@ -952,18 +985,6 @@
}
}
}
- catch (UnsupportedEncodingException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- }
- catch (DataFormatException e)
- {
- // Should never happen
- }
finally
{
if (cursor != null)
@@ -975,7 +996,7 @@
txn.abort();
} catch (DatabaseException e1)
{
- // can't do much more. The ReplicationServer is shuting down.
+ // can't do much more. The ReplicationServer is shutting down.
}
}
}
@@ -996,33 +1017,22 @@
* Decode the provided database entry as a the value of a counter.
* @param entry The provided entry.
* @return The counter value.
- * @throws DataFormatException
*/
private static int decodeCounterValue(byte[] entry)
- throws DataFormatException
{
- try
- {
- String numAckStr = new String(entry, 0, entry.length, "UTF-8");
- return Integer.parseInt(numAckStr);
-
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ String numAckStr = decodeUTF8(entry);
+ return Integer.parseInt(numAckStr);
}
/**
* Encode the provided counter value in a database entry.
* @param entry The provided entry.
- * @return The databse entry with the counter value encoded inside..
- * @throws UnsupportedEncodingException
+ * @return The database entry with the counter value encoded inside.
*/
static private DatabaseEntry encodeCounterValue(int value)
- throws UnsupportedEncodingException
{
DatabaseEntry entry = new DatabaseEntry();
- entry.setData(String.valueOf(value).getBytes("UTF-8"));
+ entry.setData(getBytes(String.valueOf(value)));
return entry;
}
--
Gitblit v1.10.0