From 45a8024fe68e7bc451a5a22afcaf31e7edb745a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 12 Aug 2013 15:22:03 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 159 +++++++++++++++++++++++++++++++++-------------------
1 files changed, 101 insertions(+), 58 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 867d23d..b3d89ba 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -27,26 +27,28 @@
*/
package org.opends.server.replication.server;
-import static com.sleepycat.je.LockMode.*;
-import static com.sleepycat.je.OperationStatus.*;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.util.StaticUtils;
import com.sleepycat.je.*;
+import static com.sleepycat.je.LockMode.*;
+import static com.sleepycat.je.OperationStatus.*;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
/**
* This class implements the interface between the underlying database
* and the dbHandler class.
@@ -67,8 +69,7 @@
* The lock used to provide exclusive access to the thread that close the db
* (shutdown or clear).
*/
- private final ReentrantReadWriteLock dbCloseLock =
- new ReentrantReadWriteLock(true);
+ private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
// Change counter management
// The Db itself does not allow to count records between a start and an end
@@ -117,12 +118,12 @@
* @param baseDn The baseDn of the replication domain.
* @param replicationServer The ReplicationServer that needs to be shutdown.
* @param dbenv The Db environment to use to create the db.
- * @throws DatabaseException If a database problem happened.
+ * @throws ChangelogException If a database problem happened.
*/
public ReplicationDB(int serverId, String baseDn,
ReplicationServer replicationServer,
ReplicationDbEnv dbenv)
- throws DatabaseException
+ throws ChangelogException
{
this.serverId = serverId;
this.baseDn = baseDn;
@@ -138,13 +139,15 @@
intializeCounters();
}
- private void intializeCounters()
+ private void intializeCounters() throws ChangelogException
{
this.counterCurrValue = 1;
- Cursor cursor = db.openCursor(null, null);
+ Cursor cursor = null;
try
{
+ cursor = db.openCursor(null, null);
+
int distBackToCounterRecord = 0;
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
@@ -164,9 +167,13 @@
}
counterCurrValue += distBackToCounterRecord;
}
+ catch (DatabaseException e)
+ {
+ throw new ChangelogException(e);
+ }
finally
{
- cursor.close();
+ close(cursor);
}
}
@@ -205,7 +212,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
finally
{
@@ -213,7 +220,14 @@
}
}
+ private void handleUnexpectedDatabaseException(DatabaseException e)
+ {
+ ChangelogException ex = new ChangelogException(e);
+ replicationServer.handleUnexpectedChangelogException(ex);
+ }
+
private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
+ throws DatabaseException
{
if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
{
@@ -276,13 +290,12 @@
* ReplicationServer DB.
*
* @param changeNumber The ChangeNumber from which the cursor must start.
- * @throws DatabaseException If a database error prevented the cursor
- * creation.
- * @throws Exception if the ReplServerDBCursor creation failed.
+ * @throws ChangelogException
+ * When a problem occurs or the startingChangeNumber does not exist.
* @return The ReplServerDBCursor.
*/
public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
- throws DatabaseException, Exception
+ throws ChangelogException
{
return new ReplServerDBCursor(changeNumber);
}
@@ -291,21 +304,19 @@
* Create a cursor that can be used to delete some record from this
* ReplicationServer database.
*
- * @throws DatabaseException If a database error prevented the cursor
+ * @throws ChangelogException If a database error prevented the cursor
* creation.
- * @throws Exception if the ReplServerDBCursor creation failed.
*
* @return The ReplServerDBCursor.
*/
- public ReplServerDBCursor openDeleteCursor()
- throws DatabaseException, Exception
+ public ReplServerDBCursor openDeleteCursor() throws ChangelogException
{
return new ReplServerDBCursor();
}
- private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException
+ private void closeAndReleaseReadLock(Cursor cursor)
{
try
{
@@ -362,7 +373,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
return null;
}
finally
@@ -421,7 +432,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
return null;
}
finally
@@ -482,7 +493,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
finally
{
@@ -492,7 +503,7 @@
}
private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
- DatabaseEntry data)
+ DatabaseEntry data) throws DatabaseException
{
final ChangeNumber cn = toChangeNumber(key.getData());
if (!isACounterRecord(cn))
@@ -548,11 +559,11 @@
*
* @param startingChangeNumber
* The ChangeNumber from which the cursor must start.
- * @throws Exception
+ * @throws ChangelogException
* When the startingChangeNumber does not exist.
*/
private ReplServerDBCursor(ChangeNumber startingChangeNumber)
- throws Exception
+ throws ChangelogException
{
if (startingChangeNumber != null)
{
@@ -591,7 +602,8 @@
if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
{
// We could not even move the cursor closed to it => failure
- throw new Exception("ChangeNumber not available");
+ throw new ChangelogException(
+ Message.raw("ChangeNumber not available"));
}
// We can move close to the startingChangeNumber.
@@ -607,15 +619,21 @@
}
cursor = localCursor;
}
- catch (Exception e)
+ catch (ChangelogException e)
{
// Unlocking is required before throwing any exception
closeAndReleaseReadLock(localCursor);
throw e;
}
+ catch (DatabaseException e)
+ {
+ // Unlocking is required before throwing any exception
+ closeAndReleaseReadLock(localCursor);
+ throw new ChangelogException(e);
+ }
}
- private ReplServerDBCursor() throws Exception
+ private ReplServerDBCursor() throws ChangelogException
{
key = new DatabaseEntry();
data = new DatabaseEntry();
@@ -644,22 +662,32 @@
txn = localTxn;
cursor = localCursor;
}
+ catch (ChangelogException e)
+ {
+ closeAndReleaseReadLock(localCursor);
+ abort(localTxn);
+ throw e;
+ }
catch (Exception e)
{
closeAndReleaseReadLock(localCursor);
+ abort(localTxn);
+ throw new ChangelogException(e);
+ }
+ }
- if (localTxn != null)
+ private void abort(Transaction localTxn)
+ {
+ if (localTxn != null)
+ {
+ try
{
- try
- {
- localTxn.abort();
- }
- catch (DatabaseException ignore)
- {
- // Ignore.
- }
+ localTxn.abort();
}
- throw e;
+ catch (DatabaseException ignore)
+ {
+ // Ignore.
+ }
}
}
@@ -689,7 +717,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
}
}
@@ -722,7 +750,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
}
}
@@ -731,20 +759,27 @@
* Get the next ChangeNumber in the database from this Cursor.
*
* @return The next ChangeNumber in the database from this cursor.
- * @throws DatabaseException In case of underlying database problem.
+ * @throws ChangelogException In case of underlying database problem.
*/
- public ChangeNumber nextChangeNumber() throws DatabaseException
+ public ChangeNumber nextChangeNumber() throws ChangelogException
{
if (isClosed)
{
return null;
}
- if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
+ try
{
- return null;
+ if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ return null;
+ }
+ return toChangeNumber(key.getData());
}
- return toChangeNumber(key.getData());
+ catch (DatabaseException e)
+ {
+ throw new ChangelogException(e);
+ }
}
/**
@@ -807,26 +842,32 @@
/**
* Delete the record at the current cursor position.
*
- * @throws DatabaseException In case of database problem.
+ * @throws ChangelogException In case of database problem.
*/
- public void delete() throws DatabaseException
+ public void delete() throws ChangelogException
{
if (isClosed)
{
throw new IllegalStateException("ReplServerDBCursor already closed");
}
- cursor.delete();
+ try
+ {
+ cursor.delete();
+ }
+ catch (DatabaseException e)
+ {
+ throw new ChangelogException(e);
+ }
}
- } // ReplServerDBCursor
+ }
/**
* Clears this change DB from the changes it contains.
*
- * @throws Exception Throws an exception it occurs.
- * @throws DatabaseException Throws a DatabaseException when it occurs.
+ * @throws ChangelogException In case of database problem.
*/
- public void clear() throws Exception, DatabaseException
+ public void clear() throws ChangelogException
{
// The coming users will be blocked until the clear is done
dbCloseLock.writeLock().lock();
@@ -915,7 +956,7 @@
}
catch (DatabaseException e)
{
- replicationServer.handleUnexpectedDatabaseException(e);
+ handleUnexpectedDatabaseException(e);
}
finally
{
@@ -927,6 +968,7 @@
private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
+ throws DatabaseException
{
Cursor cursor = db.openCursor(null, null);
try
@@ -981,6 +1023,7 @@
private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
+ throws DatabaseException
{
Cursor cursor = db.openCursor(null, null);
try
--
Gitblit v1.10.0