From 45a8024fe68e7bc451a5a22afcaf31e7edb745a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 12 Aug 2013 15:22:03 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/DraftCNDB.java | 224 +++++++++++++++++++++++++++++--------------------------
1 files changed, 117 insertions(+), 107 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index 2d2e04c..5d7db0d 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -27,6 +27,19 @@
*/
package org.opends.server.replication.server;
+import java.io.Closeable;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DebugLogLevel;
+
+import com.sleepycat.je.*;
+
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -35,16 +48,6 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
-import java.io.Closeable;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.opends.messages.MessageBuilder;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.types.DebugLogLevel;
-
-import com.sleepycat.je.*;
-
/**
* This class implements the interface between the underlying database
* and the dbHandler class.
@@ -53,6 +56,8 @@
public class DraftCNDB
{
private static final DebugTracer TRACER = getTracer();
+ private static final int DATABASE_EMPTY = 0;
+
private Database db = null;
private ReplicationDbEnv dbenv = null;
private ReplicationServer replicationServer;
@@ -61,27 +66,23 @@
* The lock used to provide exclusive access to the thread that close the db
* (shutdown or clear).
*/
- private ReentrantReadWriteLock dbCloseLock;
+ private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
* @param replicationServer The ReplicationServer that needs to be shutdown.
* @param dbenv The Db environment to use to create the db.
- * @throws DatabaseException If a database problem happened.
+ * @throws ChangelogException If a database problem happened.
*/
- public DraftCNDB(
- ReplicationServer replicationServer,
- ReplicationDbEnv dbenv)
- throws DatabaseException
+ public DraftCNDB(ReplicationServer replicationServer, ReplicationDbEnv dbenv)
+ throws ChangelogException
{
this.dbenv = dbenv;
this.replicationServer = replicationServer;
// Get or create the associated ReplicationServerDomain and Db.
db = dbenv.getOrCreateDraftCNDb();
-
- dbCloseLock = new ReentrantReadWriteLock(true);
}
/**
@@ -101,8 +102,7 @@
try
{
DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
- DatabaseEntry data = new DraftCNData(
- value, domainBaseDN, changeNumber);
+ DatabaseEntry data = new DraftCNData(value, domainBaseDN, changeNumber);
// Use a transaction so that we can override durability.
Transaction txn = null;
@@ -121,24 +121,40 @@
}
finally
{
- if (txn != null)
- {
- // No effect if txn has committed.
- try
- {
- txn.abort();
- }
- catch (Exception e)
- {
- // Ignored.
- }
- }
+ abort(txn);
dbCloseLock.readLock().unlock();
}
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
+ }
+ catch (ChangelogException e)
+ {
+ replicationServer.handleUnexpectedChangelogException(e);
+ }
+ }
+
+ /**
+ * Aborts the current transaction. It has no effect if the transaction has
+ * committed.
+ *
+ * @param txn
+ * the transaction to abort
+ */
+ private static void abort(Transaction txn)
+ {
+ if (txn != null)
+ {
+ try
+ {
+ txn.abort();
+ }
+ catch (DatabaseException ignored)
+ {
+ // Ignore.
+ TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+ }
}
}
@@ -147,18 +163,11 @@
*/
public void shutdown()
{
+ dbCloseLock.writeLock().lock();
try
{
- dbCloseLock.writeLock().lock();
- try
- {
- db.close();
- db = null;
- }
- finally
- {
- dbCloseLock.writeLock().unlock();
- }
+ db.close();
+ db = null;
}
catch (DatabaseException e)
{
@@ -167,19 +176,21 @@
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
+ finally
+ {
+ dbCloseLock.writeLock().unlock();
+ }
}
/**
* Create a cursor that can be used to search or iterate on this DB.
*
* @param draftCN The draftCN from which the cursor must start.
- * @throws DatabaseException If a database error prevented the cursor
+ * @throws ChangelogException If a database error prevented the cursor
* creation.
- * @throws Exception if the ReplServerDBCursor creation failed.
* @return The ReplServerDBCursor.
*/
- public DraftCNDBCursor openReadCursor(int draftCN)
- throws DatabaseException, Exception
+ public DraftCNDBCursor openReadCursor(int draftCN) throws ChangelogException
{
return new DraftCNDBCursor(draftCN);
}
@@ -188,14 +199,11 @@
* Create a cursor that can be used to delete some record from this
* ReplicationServer database.
*
- * @throws DatabaseException If a database error prevented the cursor
+ * @throws ChangelogException If a database error prevented the cursor
* creation.
- * @throws Exception if the ReplServerDBCursor creation failed.
- *
* @return The ReplServerDBCursor.
*/
- public DraftCNDBCursor openDeleteCursor()
- throws DatabaseException, Exception
+ public DraftCNDBCursor openDeleteCursor() throws ChangelogException
{
return new DraftCNDBCursor();
}
@@ -235,11 +243,10 @@
DatabaseEntry entry = new DatabaseEntry();
if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS)
{
- /* database is empty */
- return 0;
+ return DATABASE_EMPTY;
}
- return new Integer(decodeUTF8(key.getData()));
+ return Integer.parseInt(decodeUTF8(key.getData()));
}
finally
{
@@ -248,8 +255,7 @@
}
catch (DatabaseException e)
{
- /* database is faulty */
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
return 0;
}
}
@@ -305,11 +311,10 @@
DatabaseEntry entry = new DatabaseEntry();
if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS)
{
- /* database is empty */
- return 0;
+ return DATABASE_EMPTY;
}
- return new Integer(decodeUTF8(key.getData()));
+ return Integer.parseInt(decodeUTF8(key.getData()));
}
finally
{
@@ -318,11 +323,17 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
return 0;
}
}
+ private void handleUnexpectedDatabaseException(DatabaseException e)
+ {
+ ChangelogException ex = new ChangelogException(e);
+ replicationServer.handleUnexpectedChangelogException(ex);
+ }
+
/**
* {@inheritDoc}
*/
@@ -357,10 +368,10 @@
*
* @param startingDraftCN
* the draftCN from which the cursor must start.
- * @throws Exception
+ * @throws ChangelogException
* when the startingDraftCN does not exist.
*/
- private DraftCNDBCursor(int startingDraftCN) throws Exception
+ private DraftCNDBCursor(int startingDraftCN) throws ChangelogException
{
this.key = new ReplicationDraftCNKey(startingDraftCN);
this.entry = new DatabaseEntry();
@@ -391,8 +402,9 @@
if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS)
{
// We could not even move the cursor closed to it => failure
- throw new Exception("ChangeLog Draft Change Number "
- + startingDraftCN + " is not available");
+ throw new ChangelogException(
+ Message.raw("ChangeLog Draft Change Number " + startingDraftCN
+ + " is not available"));
}
if (localCursor.getPrev(key, entry, LockMode.DEFAULT) != SUCCESS)
@@ -414,7 +426,13 @@
this.txn = null;
this.cursor = localCursor;
}
- catch (Exception e)
+ catch (DatabaseException e)
+ {
+ // Unlocking is required before throwing any exception
+ closeLockedCursor(localCursor);
+ throw new ChangelogException(e);
+ }
+ catch (ChangelogException e)
{
// Unlocking is required before throwing any exception
closeLockedCursor(localCursor);
@@ -424,7 +442,7 @@
- private DraftCNDBCursor() throws Exception
+ private DraftCNDBCursor() throws ChangelogException
{
Transaction localTxn = null;
Cursor localCursor = null;
@@ -453,32 +471,20 @@
this.txn = localTxn;
this.cursor = localCursor;
}
- catch (Exception e)
+ catch (DatabaseException e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- try
- {
- closeLockedCursor(localCursor);
- }
- catch (DatabaseException ignored)
- {
- // Ignore.
- TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
- }
+ closeLockedCursor(localCursor);
+ DraftCNDB.abort(localTxn);
+ throw new ChangelogException(e);
+ }
+ catch (ChangelogException e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
- if (localTxn != null)
- {
- try
- {
- localTxn.abort();
- }
- catch (DatabaseException ignored)
- {
- // Ignore.
- TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
- }
- }
+ closeLockedCursor(localCursor);
+ DraftCNDB.abort(localTxn);
throw e;
}
}
@@ -508,7 +514,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
}
}
@@ -541,7 +547,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
}
}
@@ -593,7 +599,6 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
-
return null;
}
@@ -619,7 +624,6 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
-
return -1;
}
@@ -651,22 +655,22 @@
/**
* Go to the next record on the cursor.
* @return the next record on this cursor.
- * @throws DatabaseException a.
+ * @throws ChangelogException a.
*/
- public boolean next() throws DatabaseException
+ public boolean next() throws ChangelogException
{
if (isClosed)
{
return false;
}
- OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- seqnumData = null;
- return false;
- }
try {
+ OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
+ {
+ seqnumData = null;
+ return false;
+ }
seqnumData = new DraftCNData(entry.getData());
}
catch(Exception e)
@@ -679,16 +683,23 @@
/**
* Delete the record at the current cursor position.
*
- * @throws DatabaseException In case of database problem.
+ * @throws ChangelogException In case of database problem.
*/
- public void delete() throws DatabaseException
+ public void delete() throws ChangelogException
{
if (isClosed)
{
throw new IllegalStateException("DraftCNDB already closed");
}
- cursor.delete();
+ try
+ {
+ cursor.delete();
+ }
+ catch (DatabaseException e)
+ {
+ throw new ChangelogException(e);
+ }
}
/**
@@ -710,10 +721,9 @@
/**
* Clears this change DB from the changes it contains.
*
- * @throws Exception Throws an exception it occurs.
- * @throws DatabaseException Throws a DatabaseException when it occurs.
+ * @throws ChangelogException Throws a DatabaseException when it occurs.
*/
- public void clear() throws Exception, DatabaseException
+ public void clear() throws ChangelogException
{
// The coming users will be blocked until the clear is done
dbCloseLock.writeLock().lock();
--
Gitblit v1.10.0