From 246e4192d3967e638aad1f12adc3e36be2aa82e2 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 04 Apr 2011 22:33:36 +0000
Subject: [PATCH] OpenDJ-107: Potential for leaking DB cursors in replication databases
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 422 +++++++++++++------------
opendj-sdk/opends/src/server/org/opends/server/util/StaticUtils.java | 64 +++
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 82 +++-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java | 18 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java | 84 +++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 27 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java | 219 ++++++++-----
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 13
11 files changed, 551 insertions(+), 392 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 1ee47a7..43cdff7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -478,8 +478,7 @@
/* the trim is done by group in order to save some CPU and IO bandwidth
* start the transaction then do a bunch of remove then commit
*/
- ReplServerDBCursor cursor;
- cursor = db.openDeleteCursor();
+ ReplServerDBCursor cursor = db.openDeleteCursor();
try
{
@@ -517,7 +516,7 @@
throw (e);
}
}
- catch (DatabaseException e)
+ catch (Exception e)
{
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index fa113b4..f42a956 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -29,6 +29,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
@@ -264,14 +265,7 @@
/* database is empty */
return 0;
}
- try
- {
- str = new String(key.getData(), "UTF-8");
- } catch (UnsupportedEncodingException e)
- {
- // never happens, return anyway
- return 0;
- }
+ str = decodeUTF8(key.getData());
int sn = new Integer(str);
return sn;
}
@@ -341,14 +335,7 @@
/* database is empty */
return 0;
}
- try
- {
- str = new String(key.getData(), "UTF-8");
- } catch (UnsupportedEncodingException e)
- {
- // never happens, returns anyway
- return 0;
- }
+ str = decodeUTF8(key.getData());
int sn = new Integer(str);
return sn;
}
@@ -387,47 +374,57 @@
*/
public class DraftCNDBCursor
{
- private Cursor cursor = null;
+ private final Cursor cursor;
// The transaction that will protect the actions done with the cursor
// Will be let null for a read cursor
// Will be set non null for a write cursor
- private Transaction txn = null;
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry entry = new DatabaseEntry();
+ private final Transaction txn;
+ private final DatabaseEntry key;
+ private final DatabaseEntry entry;
+
+ private boolean isClosed = false;
+
+
/**
* Creates a cursor that can be used for browsing the db.
*
- * @param startingDraftCN the draftCN from which the cursor must
- * start.
- * @throws Exception when the startingDraftCN does not exist.
+ * @param startingDraftCN
+ * the draftCN from which the cursor must start.
+ * @throws Exception
+ * when the startingDraftCN does not exist.
*/
private DraftCNDBCursor(int startingDraftCN) throws Exception
{
+ // For consistency with other constructor, we'll use a local here,
+ // even though it's always null.
+ final Transaction localTxn = null;
+ Cursor localCursor = null;
+
+ this.key = new ReplicationDraftCNKey(startingDraftCN);
+ this.entry = new DatabaseEntry();
+
+ // Take the lock. From now on, whatever error that happen in the life
+ // of this cursor should end by unlocking that lock. We must also
+ // unlock it when throwing an exception.
+ dbCloseLock.readLock().lock();
+
try
{
- // Take the lock. From now on, whatever error that happen in the life
- // of this cursor should end by unlocking that lock. We must also
- // unlock it when throwing an exception.
- dbCloseLock.readLock().lock();
-
- cursor = db.openCursor(txn, null);
+ localCursor = db.openCursor(localTxn, null);
if (startingDraftCN >= 0)
{
- key = new ReplicationDraftCNKey(startingDraftCN);
- entry = new DatabaseEntry();
-
- if (cursor.getSearchKey(key, entry, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
+ if (localCursor.getSearchKey(
+ key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS)
{
// We could not move the cursor to the expected startingChangeNumber
- if (cursor.getSearchKeyRange(key, entry, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
+ if (localCursor.getSearchKeyRange(key, entry,
+ LockMode.DEFAULT) != OperationStatus.SUCCESS)
{
// We could not even move the cursor closed to it => failure
- throw new Exception("ChangeLog Draft Change Number " +
- startingDraftCN + " is not available");
+ throw new Exception("ChangeLog Draft Change Number "
+ + startingDraftCN + " is not available");
}
else
{
@@ -435,57 +432,76 @@
// Let's create a cursor from that point.
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
+ if (localCursor.getPrev(
+ key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS)
{
- closeLockedCursor(cursor);
- dbCloseLock.readLock().lock();
- cursor = db.openCursor(txn, null);
+ localCursor.close();
+ localCursor = db.openCursor(localTxn, null);
}
}
}
- else
- {
- // success : key has the right value
- }
}
+
+ this.txn = localTxn;
+ this.cursor = localCursor;
}
catch (Exception e)
{
// Unlocking is required before throwing any exception
- closeLockedCursor(cursor);
- throw (e);
+ closeLockedCursor(localCursor);
+ throw e;
}
}
- private DraftCNDBCursor() throws DatabaseException
+
+
+ private DraftCNDBCursor() throws Exception
{
+ Transaction localTxn = null;
+ Cursor localCursor = null;
+
+ this.key = new DatabaseEntry();
+ this.entry = new DatabaseEntry();
+
+ // We'll go on only if no close or no clear is running
+ dbCloseLock.readLock().lock();
try
{
- // We'll go on only if no close or no clear is running
- dbCloseLock.readLock().lock();
-
// Create the transaction that will protect whatever done with this
// write cursor.
- txn = dbenv.beginTransaction();
+ localTxn = dbenv.beginTransaction();
+ localCursor = db.openCursor(localTxn, null);
- cursor = db.openCursor(txn, null);
+ this.txn = localTxn;
+ this.cursor = localCursor;
}
- catch(DatabaseException e)
+ catch (Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- if (txn != null)
+ try
+ {
+ closeLockedCursor(localCursor);
+ }
+ catch (DatabaseException ignored)
+ {
+ // Ignore.
+ TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+ }
+
+ if (localTxn != null)
{
try
{
- txn.abort();
+ localTxn.abort();
}
- catch (DatabaseException dbe)
- {}
+ catch (DatabaseException ignored)
+ {
+ // Ignore.
+ TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+ }
}
- closeLockedCursor(cursor);
- throw (e);
+ throw e;
}
}
@@ -494,33 +510,50 @@
*/
public void close()
{
+ synchronized (this)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+ }
+
+ boolean closeHasFailed = false;
+
try
{
closeLockedCursor(cursor);
- cursor = null;
}
- catch (DatabaseException e)
+ catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
+
if (txn != null)
{
try
{
txn.commit();
- } catch (DatabaseException e)
+ }
+ catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
}
+
+ if (closeHasFailed)
+ {
+ replicationServer.shutdown();
+ }
}
/**
@@ -532,49 +565,63 @@
*/
public void abort()
{
- if (cursor == null)
- return;
+ synchronized (this)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+ }
+
+ boolean closeHasFailed = false;
+
try
{
closeLockedCursor(cursor);
- cursor = null;
}
- catch (LockConflictException e1)
+ catch (LockConflictException e)
{
// The DB documentation states that a DeadlockException
// on the close method of a cursor that is aborting should
// be ignored.
}
- catch (DatabaseException e)
+ catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
+
if (txn != null)
{
try
{
txn.abort();
- } catch (DatabaseException e)
+ }
+ catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
}
+
+ if (closeHasFailed)
+ {
+ replicationServer.shutdown();
+ }
}
/**
* Getter for the value field of the current cursor.
* @return The current value field.
- * @throws DatabaseException When an error happens.
*/
- public String currentValue() throws DatabaseException
+ public String currentValue()
{
try
{
@@ -598,9 +645,8 @@
/**
* Getter for the serviceID field of the current cursor.
* @return The current serviceID.
- * @throws DatabaseException When an error happens.
*/
- public String currentServiceID() throws DatabaseException
+ public String currentServiceID()
{
try
{
@@ -616,7 +662,7 @@
}
catch(Exception e)
{
-
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
return null;
}
@@ -624,9 +670,8 @@
/**
* Returns the replication changeNumber associated with the current key.
* @return the replication changeNumber
- * @throws DatabaseException when a problem occurs.
*/
- public ChangeNumber currentChangeNumber() throws DatabaseException
+ public ChangeNumber currentChangeNumber()
{
try
{
@@ -672,6 +717,16 @@
{
cursor.delete();
}
+
+ /**
+ * Returns the current key associated with this cursor.
+ *
+ * @return The current key associated with this cursor.
+ */
+ public DatabaseEntry getKey()
+ {
+ return key;
+ }
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index 567d289..dcf15e6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -417,16 +417,16 @@
// could not handle the Deadlock after DEADLOCK_RETRIES tries.
// shutdown the ReplicationServer.
shutdown = true;
- throw (e);
+ throw e;
}
}
- catch (DatabaseException e)
+ catch (Exception e)
{
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
shutdown = true;
cursor.abort();
- throw (e);
+ throw e;
}
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
index e988aea..cd858e7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -124,7 +125,7 @@
*/
public int getDraftCN()
{
- ReplicationDraftCNKey sk = (ReplicationDraftCNKey)this.draftCNDbCursor.key;
+ ReplicationDraftCNKey sk = (ReplicationDraftCNKey) draftCNDbCursor.getKey();
int currentSeqnum = sk.getDraftCN();
return currentSeqnum;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index f29bc2a..1b933fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -350,46 +350,61 @@
new ReplicationIteratorComparator();
SortedSet<ReplicationIterator> iteratorSortedSet =
new TreeSet<ReplicationIterator>(comparator);
- /* fill the lateQueue */
- for (int serverId : replicationServerDomain.getServers())
+ try
{
- ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
- ReplicationIterator iterator =
- replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- if (iterator != null)
+ /* fill the lateQueue */
+ for (int serverId : replicationServerDomain.getServers())
{
- if (iterator.getChange() != null)
+ ChangeNumber lastCsn = serverState
+ .getMaxChangeNumber(serverId);
+ ReplicationIterator iterator = replicationServerDomain
+ .getChangelogIterator(serverId, lastCsn);
+ if (iterator != null)
+ {
+ if (iterator.getChange() != null)
+ {
+ iteratorSortedSet.add(iterator);
+ }
+ else
+ {
+ iterator.releaseCursor();
+ }
+ }
+ }
+
+ // The loop below relies on the fact that it is sorted based
+ // on the currentChange of each iterator to consider the next
+ // change across all servers.
+ //
+ // Hence it is necessary to remove and eventual add again an
+ // iterator when looping in order to keep consistent the order of
+ // the iterators (see ReplicationIteratorComparator.
+ while (!iteratorSortedSet.isEmpty()
+ && (lateQueue.count() < 100)
+ && (lateQueue.bytesCount() < 50000))
+ {
+ ReplicationIterator iterator = iteratorSortedSet
+ .first();
+ iteratorSortedSet.remove(iterator);
+ lateQueue.add(iterator.getChange());
+ if (iterator.next())
{
iteratorSortedSet.add(iterator);
- } else
+ }
+ else
{
iterator.releaseCursor();
}
}
}
-
- // The loop below relies on the fact that it is sorted based
- // on the currentChange of each iterator to consider the next
- // change across all servers.
- // Hence it is necessary to remove and eventual add again an iterator
- // when looping in order to keep consistent the order of the
- // iterators (see ReplicationIteratorComparator.
- while (!iteratorSortedSet.isEmpty() &&
- (lateQueue.count()<100) &&
- (lateQueue.bytesCount()<50000) )
+ finally
{
- ReplicationIterator iterator = iteratorSortedSet.first();
- iteratorSortedSet.remove(iterator);
- lateQueue.add(iterator.getChange());
- if (iterator.next())
- iteratorSortedSet.add(iterator);
- else
+ for (ReplicationIterator iterator : iteratorSortedSet)
+ {
iterator.releaseCursor();
+ }
}
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
+
/*
* If the late queue is empty then we could not find any
* messages in the replication log so the remote serevr is not
@@ -527,9 +542,16 @@
// if that iterator has changes, then it is a candidate
// it is added in the sorted list at a position given by its
// current change (see ReplicationIteratorComparator).
- if ((iterator != null) && (iterator.getChange() != null))
+ if (iterator != null)
{
- iteratorSortedSet.add(iterator);
+ if (iterator.getChange() != null)
+ {
+ iteratorSortedSet.add(iterator);
+ }
+ else
+ {
+ iterator.releaseCursor();
+ }
}
}
UpdateMsg msg = iteratorSortedSet.first().getChange();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 7330fac..b512b7e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -30,6 +30,8 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.util.StaticUtils.decodeUTF8;
+import static org.opends.server.util.StaticUtils.getBytes;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
@@ -37,8 +39,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
+
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.zip.DataFormatException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -144,12 +146,12 @@
// Initialize counter
this.counterCurrValue = 1;
cursor = db.openCursor(txn, null);
- status = cursor.getLast(key, data, LockMode.DEFAULT);
- while (status == OperationStatus.SUCCESS)
+ try
{
- try
+ status = cursor.getLast(key, data, LockMode.DEFAULT);
+ while (status == OperationStatus.SUCCESS)
{
- ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
+ ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
if (!ReplicationDB.isaCounter(cn))
{
status = cursor.getPrev(key, data, LockMode.DEFAULT);
@@ -158,38 +160,17 @@
else
{
// counter record
- counterCurrValue = decodeCounterValue(data.getData())+1;
+ counterCurrValue = decodeCounterValue(data.getData()) + 1;
counterTsLimit = cn.getTime();
break;
}
}
- catch (UnsupportedEncodingException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shuting down.
- }
- }
- replicationServer.shutdown();
- }
- catch (DataFormatException e)
- {
- // Should never happen
- }
+ counterCurrValue += distBackToCounterRecord;
}
- counterCurrValue += distBackToCounterRecord;
- cursor.close();
-
+ finally
+ {
+ cursor.close();
+ }
}
/**
@@ -377,55 +358,38 @@
String str = null;
ChangeNumber cn = null;
+ dbCloseLock.readLock().lock();
try
{
- dbCloseLock.readLock().lock();
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
cursor = db.openCursor(null, null);
- }
- catch (DatabaseException e1)
- {
- dbCloseLock.readLock().unlock();
- return null;
- }
- try
- {
- try
+
+ OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+
+ if (status != OperationStatus.SUCCESS)
{
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+ /* database is empty */
+ return null;
+ }
+
+ str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ // First record is a counter record .. go next
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
if (status != OperationStatus.SUCCESS)
{
- /* database is empty */
+ // DB contains only a counter record
return null;
}
- try
+ else
{
- str = new String(key.getData(), "UTF-8");
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- // First record is a counter record .. go next
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- // DB contains only a counter record
- return null;
- }
- else
- {
- cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
- }
- }
- } catch (UnsupportedEncodingException e)
- {
- // never happens
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
}
}
- finally
- {
- closeLockedCursor(cursor);
- }
}
catch (DatabaseException e)
{
@@ -437,11 +401,18 @@
replicationServer.shutdown();
cn = null;
}
+ finally
+ {
+ closeLockedCursor(cursor);
+ }
return cn;
}
+
+
/**
* Read the last Change from the database.
+ *
* @return the last ChangeNumber.
*/
public ChangeNumber readLastChange()
@@ -449,43 +420,36 @@
Cursor cursor = null;
ChangeNumber cn = null;
+ dbCloseLock.readLock().lock();
try
{
- dbCloseLock.readLock().lock();
- try
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
+ cursor = db.openCursor(null, null);
+
+ OperationStatus status = cursor.getLast(key, data,
+ LockMode.DEFAULT);
+
+ if (status != OperationStatus.SUCCESS)
{
- cursor = db.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- /* database is empty */
- return null;
- }
- try
- {
- String str = new String(key.getData(), "UTF-8");
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
- {
- /* database only contain a counter record - don't know
- * how much it can be possible but ... */
- cn = null;
- }
- }
- }
- catch (UnsupportedEncodingException e)
- {
- // never happens
- }
+ /* database is empty */
+ return null;
}
- finally
+
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
{
- closeLockedCursor(cursor);
+ if (cursor.getPrev(key, data,
+ LockMode.DEFAULT) != OperationStatus.SUCCESS)
+ {
+ /*
+ * database only contain a counter record - don't know how much it can
+ * be possible but ...
+ */
+ cn = null;
+ }
}
}
catch (DatabaseException e)
@@ -497,6 +461,11 @@
replicationServer.shutdown();
cn = null;
}
+ finally
+ {
+ closeLockedCursor(cursor);
+ }
+
return cn;
}
@@ -515,44 +484,56 @@
*/
public class ReplServerDBCursor
{
- private Cursor cursor = null;
-
- // The transaction that will protect the actions done with the cursor
+ // The transaction that will protect the actions done with the cursor
// Will be let null for a read cursor
// Will be set non null for a write cursor
- private Transaction txn = null;
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
+ private final Transaction txn;
+ private final Cursor cursor;
+ private final DatabaseEntry key;
+ private final DatabaseEntry data;
+
+ private boolean isClosed = false;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
* replicationServer db.
*
- * @param startingChangeNumber The ChangeNumber from which the cursor must
- * start.
- * @throws Exception When the startingChangeNumber does not exist.
+ * @param startingChangeNumber
+ * The ChangeNumber from which the cursor must start.
+ * @throws Exception
+ * When the startingChangeNumber does not exist.
*/
private ReplServerDBCursor(ChangeNumber startingChangeNumber)
- throws Exception
+ throws Exception
{
+ if (startingChangeNumber != null)
+ {
+ key = new ReplicationKey(startingChangeNumber);
+ }
+ else
+ {
+ key = new DatabaseEntry();
+ }
+ data = new DatabaseEntry();
+
+ txn = null;
+
+ // Take the lock. From now on, whatever error that happen in the life
+ // of this cursor should end by unlocking that lock. We must also
+ // unlock it when throwing an exception.
+ dbCloseLock.readLock().lock();
+
+ Cursor localCursor = null;
try
{
- // Take the lock. From now on, whatever error that happen in the life
- // of this cursor should end by unlocking that lock. We must also
- // unlock it when throwing an exception.
- dbCloseLock.readLock().lock();
-
- cursor = db.openCursor(txn, null);
+ localCursor = db.openCursor(txn, null);
if (startingChangeNumber != null)
{
- key = new ReplicationKey(startingChangeNumber);
- data = new DatabaseEntry();
-
- if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
+ if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
// We could not move the cursor to the expected startingChangeNumber
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+ if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
// We could not even move the cursor closed to it => failure
@@ -564,51 +545,75 @@
// Let's create a cursor from that point.
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
- if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+ if (localCursor.getPrev(key, data, LockMode.DEFAULT) !=
OperationStatus.SUCCESS)
{
- closeLockedCursor(cursor);
- dbCloseLock.readLock().lock();
- cursor = db.openCursor(txn, null);
+ localCursor.close();
+ localCursor = db.openCursor(txn, null);
}
}
}
}
+ cursor = localCursor;
}
catch (Exception e)
{
- // Unlocking is required before throwing any exception
- closeLockedCursor(cursor);
- throw (e);
+ // Unlocking is required before throwing any exception
+ try
+ {
+ closeLockedCursor(localCursor);
+ }
+ catch (Exception ignore)
+ {
+ // Ignore.
+ }
+ throw e;
}
}
- private ReplServerDBCursor() throws DatabaseException
+ private ReplServerDBCursor() throws Exception
{
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+
+ // We'll go on only if no close or no clear is running
+ dbCloseLock.readLock().lock();
+
+ Transaction localTxn = null;
+ Cursor localCursor = null;
try
{
- // We'll go on only if no close or no clear is running
- dbCloseLock.readLock().lock();
-
// Create the transaction that will protect whatever done with this
// write cursor.
- txn = dbenv.beginTransaction();
+ localTxn = dbenv.beginTransaction();
+ localCursor = db.openCursor(localTxn, null);
- cursor = db.openCursor(txn, null);
+ txn = localTxn;
+ cursor = localCursor;
}
- catch(DatabaseException e)
+ catch (Exception e)
{
- if (txn != null)
+ try
+ {
+ closeLockedCursor(localCursor);
+ }
+ catch (Exception ignore)
+ {
+ // Ignore.
+ }
+
+ if (localTxn != null)
{
try
{
- txn.abort();
+ localTxn.abort();
}
- catch (DatabaseException dbe)
- {}
+ catch (DatabaseException ignore)
+ {
+ // Ignore.
+ }
}
- closeLockedCursor(cursor);
- throw (e);
+ throw e;
}
}
@@ -617,10 +622,20 @@
*/
public void close()
{
+ synchronized (this)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+ }
+
+ boolean closeHasFailed = false;
+
try
{
closeLockedCursor(cursor);
- cursor = null;
}
catch (DatabaseException e)
{
@@ -628,22 +643,29 @@
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
+
if (txn != null)
{
try
{
txn.commit();
- } catch (DatabaseException e)
+ }
+ catch (DatabaseException e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
}
+
+ if (closeHasFailed)
+ {
+ replicationServer.shutdown();
+ }
}
/**
@@ -655,14 +677,22 @@
*/
public void abort()
{
- if (cursor == null)
- return;
+ synchronized (this)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+ }
+
+ boolean closeHasFailed = false;
+
try
{
closeLockedCursor(cursor);
- cursor = null;
}
- catch (LockConflictException e1)
+ catch (LockConflictException e)
{
// The DB documentation states that a DeadlockException
// on the close method of a cursor that is aborting should
@@ -674,22 +704,29 @@
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
+
if (txn != null)
{
try
{
txn.abort();
- } catch (DatabaseException e)
+ }
+ catch (DatabaseException e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- replicationServer.shutdown();
+ closeHasFailed = true;
}
}
+
+ if (closeHasFailed)
+ {
+ replicationServer.shutdown();
+ }
}
/**
@@ -706,15 +743,8 @@
{
return null;
}
- try
- {
- String csnString = new String(key.getData(), "UTF-8");
- return new ChangeNumber(csnString);
- } catch (UnsupportedEncodingException e)
- {
- // can't happen
- return null;
- }
+ String csnString = decodeUTF8(key.getData());
+ return new ChangeNumber(csnString);
}
/**
@@ -738,26 +768,29 @@
{
return null;
}
+
try
{
- ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
- if(ReplicationDB.isaCounter(cn))
+ ChangeNumber cn = new ChangeNumber(
+ decodeUTF8(key.getData()));
+ if (ReplicationDB.isaCounter(cn))
{
// counter record
continue;
}
- currentChange = ReplicationData.generateChange(data.getData());
- } catch (Exception e) {
+ currentChange = ReplicationData.generateChange(data
+ .getData());
+ }
+ catch (Exception e)
+ {
/*
* An error happening trying to convert the data from the
- * replicationServer database to an Update Message.
- * This can only happen if the database is corrupted.
- * There is not much more that we can do at this point except trying
- * to continue with the next record.
- * In such case, it is therefore possible that we miss some changes.
- * TODO. log an error message.
- * TODO : REPAIR : Such problem should be handled by the
- * repair functionality.
+ * replicationServer database to an Update Message. This can only
+ * happen if the database is corrupted. There is not much more that we
+ * can do at this point except trying to continue with the next
+ * record. In such case, it is therefore possible that we miss some
+ * changes. TODO. log an error message. TODO : REPAIR : Such problem
+ * should be handled by the repair functionality.
*/
}
}
@@ -859,7 +892,7 @@
while (status == OperationStatus.SUCCESS)
{
// test whether the record is a regular change or a counter
- String csnString = new String(key.getData(), "UTF-8");
+ String csnString = decodeUTF8(key.getData());
cn = new ChangeNumber(csnString);
if (cn.getServerId() != 0)
{
@@ -900,7 +933,7 @@
status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
}
else
{
@@ -915,7 +948,7 @@
}
while (status == OperationStatus.SUCCESS)
{
- cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
if (!ReplicationDB.isaCounter(cn))
{
// regular change record
@@ -952,18 +985,6 @@
}
}
}
- catch (UnsupportedEncodingException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- }
- catch (DataFormatException e)
- {
- // Should never happen
- }
finally
{
if (cursor != null)
@@ -975,7 +996,7 @@
txn.abort();
} catch (DatabaseException e1)
{
- // can't do much more. The ReplicationServer is shuting down.
+ // can't do much more. The ReplicationServer is shutting down.
}
}
}
@@ -996,33 +1017,22 @@
* Decode the provided database entry as a the value of a counter.
* @param entry The provided entry.
* @return The counter value.
- * @throws DataFormatException
*/
private static int decodeCounterValue(byte[] entry)
- throws DataFormatException
{
- try
- {
- String numAckStr = new String(entry, 0, entry.length, "UTF-8");
- return Integer.parseInt(numAckStr);
-
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ String numAckStr = decodeUTF8(entry);
+ return Integer.parseInt(numAckStr);
}
/**
* Encode the provided counter value in a database entry.
* @param entry The provided entry.
- * @return The databse entry with the counter value encoded inside..
- * @throws UnsupportedEncodingException
+ * @return The database entry with the counter value encoded inside.
*/
static private DatabaseEntry encodeCounterValue(int value)
- throws UnsupportedEncodingException
{
DatabaseEntry entry = new DatabaseEntry();
- entry.setData(String.valueOf(value).getBytes("UTF-8"));
+ entry.setData(getBytes(String.valueOf(value)));
return entry;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index a5049b7..cb34f92 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -32,6 +32,8 @@
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.types.DebugLogLevel;
+
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -131,9 +133,9 @@
*/
private void start() throws DatabaseException, ReplicationDBException
{
- Cursor cursor = stateDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
+ Cursor cursor = stateDb.openCursor(null, null);
try
{
@@ -259,7 +261,14 @@
}
finally
{
- cursor.close();
+ try
+ {
+ cursor.close();
+ }
+ catch (Exception ignored)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4f60457..2b1890b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1452,33 +1452,6 @@
}
/**
- * Creates and returns an iterator.
- * When the iterator is not used anymore, the caller MUST call the
- * ReplicationIterator.releaseCursor() method to free the resources
- * and locks used by the ReplicationIterator.
- *
- * @param serverId Identifier of the server for which the iterator is created.
- * @param changeNumber Starting point for the iterator.
- * @return the created ReplicationIterator. Null when no DB is available
- * for the provided server Id.
- */
- public ReplicationIterator getIterator(int serverId,
- ChangeNumber changeNumber)
- {
- DbHandler handler = sourceDbHandlers.get(serverId);
- if (handler == null)
- return null;
- try
- {
- ReplicationIterator it = handler.generateIterator(changeNumber);
- return it;
- } catch (Exception e)
- {
- return null;
- }
- }
-
- /**
* Returns the change count for that ReplicationServerDomain.
*
* @return the change count.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/util/StaticUtils.java b/opendj-sdk/opends/src/server/org/opends/server/util/StaticUtils.java
index f62d1f7..84ad567 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/util/StaticUtils.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/util/StaticUtils.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.util;
@@ -31,13 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -170,6 +165,61 @@
/**
+ * Returns the provided byte array decoded as a UTF-8 string without throwing
+ * an UnsupportedEncodingException. This method is equivalent to:
+ *
+ * <pre>
+ * try
+ * {
+ * return new String(bytes, "UTF-8");
+ * }
+ * catch (UnsupportedEncodingException e)
+ * {
+ * // Should never happen: UTF-8 is always supported.
+ * throw new RuntimeException(e);
+ * }
+ * </pre>
+ *
+ * @param bytes
+ * The byte array to be decoded as a UTF-8 string.
+ * @return The decoded string.
+ */
+ public static String decodeUTF8(final byte[] bytes)
+ {
+ Validator.ensureNotNull(bytes);
+
+ if (bytes.length == 0)
+ {
+ return "".intern();
+ }
+
+ final StringBuilder builder = new StringBuilder(bytes.length);
+ final int sz = bytes.length;
+
+ for (int i = 0; i < sz; i++)
+ {
+ final byte b = bytes[i];
+ if ((b & 0x7f) != b)
+ {
+ try
+ {
+ builder.append(new String(bytes, i, (sz - i), "UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // Should never happen: UTF-8 is always supported.
+ throw new RuntimeException(e);
+ }
+ break;
+ }
+ builder.append((char) b);
+ }
+ return builder.toString();
+ }
+
+
+
+ /**
* Construct a byte array containing the UTF-8 encoding of the
* provided <code>char</code> array.
*
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java
index 794942b..48d4154 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -126,26 +127,31 @@
assertEquals(handler.getLastKey(), sn3);
DraftCNDBCursor dbc = handler.getReadCursor(firstkey);
- assertEquals(dbc.currentChangeNumber(), changeNumber1);
- assertEquals(dbc.currentServiceID(), serviceID1);
- assertEquals(dbc.currentValue(), value1);
- assertTrue(dbc.toString().length() != 0);
+ try
+ {
+ assertEquals(dbc.currentChangeNumber(), changeNumber1);
+ assertEquals(dbc.currentServiceID(), serviceID1);
+ assertEquals(dbc.currentValue(), value1);
+ assertTrue(dbc.toString().length() != 0);
- assertTrue(dbc.next());
+ assertTrue(dbc.next());
- assertEquals(dbc.currentChangeNumber(), changeNumber2);
- assertEquals(dbc.currentServiceID(), serviceID2);
- assertEquals(dbc.currentValue(), value2);
+ assertEquals(dbc.currentChangeNumber(), changeNumber2);
+ assertEquals(dbc.currentServiceID(), serviceID2);
+ assertEquals(dbc.currentValue(), value2);
- assertTrue(dbc.next());
+ assertTrue(dbc.next());
- assertEquals(dbc.currentChangeNumber(), changeNumber3);
- assertEquals(dbc.currentServiceID(), serviceID3);
- assertEquals(dbc.currentValue(), value3);
+ assertEquals(dbc.currentChangeNumber(), changeNumber3);
+ assertEquals(dbc.currentServiceID(), serviceID3);
+ assertEquals(dbc.currentValue(), value3);
- assertFalse(dbc.next());
-
- handler.releaseReadCursor(dbc);
+ assertFalse(dbc.next());
+ }
+ finally
+ {
+ handler.releaseReadCursor(dbc);
+ }
handler.setPurgeDelay(100);
@@ -258,25 +264,43 @@
assertEquals(handler.getValue(sn3),value3);
DraftCNDbIterator it = handler.generateIterator(sn1);
- assertEquals(it.getDraftCN(),sn1);
- assertTrue(it.next());
- assertEquals(it.getDraftCN(),sn2);
- assertTrue(it.next());
- assertEquals(it.getDraftCN(),sn3);
- assertFalse(it.next());
- it.releaseCursor();
+ try
+ {
+ assertEquals(it.getDraftCN(), sn1);
+ assertTrue(it.next());
+ assertEquals(it.getDraftCN(), sn2);
+ assertTrue(it.next());
+ assertEquals(it.getDraftCN(), sn3);
+ assertFalse(it.next());
+ }
+ finally
+ {
+ it.releaseCursor();
+ }
it = handler.generateIterator(sn2);
- assertEquals(it.getDraftCN(),sn2);
- assertTrue(it.next());
- assertEquals(it.getDraftCN(),sn3);
- assertFalse(it.next());
- it.releaseCursor();
+ try
+ {
+ assertEquals(it.getDraftCN(), sn2);
+ assertTrue(it.next());
+ assertEquals(it.getDraftCN(), sn3);
+ assertFalse(it.next());
+ }
+ finally
+ {
+ it.releaseCursor();
+ }
it = handler.generateIterator(sn3);
- assertEquals(it.getDraftCN(),sn3);
- assertFalse(it.next());
- it.releaseCursor();
+ try
+ {
+ assertEquals(it.getDraftCN(), sn3);
+ assertFalse(it.next());
+ }
+ finally
+ {
+ it.releaseCursor();
+ }
// Clear ...
handler.clear();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java
index ef9d5c8..fccde2c 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.util;
@@ -138,6 +139,21 @@
}
/**
+ * Tests the {@link StaticUtils#decodeUTF8(byte[])} method.
+ *
+ * @param inputString
+ * The input string.
+ * @throws Exception
+ * If the test failed unexpectedly.
+ */
+ @Test(dataProvider = "getBytesTestData")
+ public void testDecodeUTF8(String inputString) throws Exception
+ {
+ final byte[] bytes = inputString.getBytes("UTF-8");
+ Assert.assertEquals(StaticUtils.decodeUTF8(bytes), inputString);
+ }
+
+ /**
* Tests the {@link StaticUtils#getBytes(String)} method.
*
* @param inputString
@@ -1174,7 +1190,7 @@
* If the test failed unexpectedly.
*/
@Test(dataProvider = "listsAreEqualTestData")
- public void testListsAreEqual(List list1, List list2, boolean result)
+ public void testListsAreEqual(List<?> list1, List<?> list2, boolean result)
throws Exception {
Assert.assertEquals(StaticUtils.listsAreEqual(list1, list2), result);
}
--
Gitblit v1.10.0