From 700dffb800fea545d109919f59d168f09b37c05e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 10 Oct 2013 14:31:02 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java | 6 --
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 37 +++++-------
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 43 +++++++++----
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java | 11 ---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 64 ---------------------
5 files changed, 45 insertions(+), 116 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 8acee72..c9c2a07 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -598,22 +598,23 @@
return null;
}
- final String crossDomainStartState = oldestRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestRecord.getChangeNumber());
- return crossDomainStartState;
+ cnIndexDBCursor =
+ getCursorFrom(cnIndexDB, oldestRecord.getChangeNumber());
+ return oldestRecord.getPreviousCookie();
}
// Request filter DOES contain a startChangeNumber
// Read the CNIndexDB to see whether it contains startChangeNumber
- CNIndexRecord startRecord = cnIndexDB.getRecord(startChangeNumber);
+ DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(startChangeNumber);
+ final CNIndexRecord startRecord = cursor.getRecord();
if (startRecord != null)
{
// found the provided startChangeNumber, let's return it
- final String crossDomainStartState = startRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
- return crossDomainStartState;
+ cnIndexDBCursor = cursor;
+ return startRecord.getPreviousCookie();
}
+ close(cursor);
// startChangeNumber provided in the request IS NOT in the CNIndexDB
@@ -630,17 +631,18 @@
// the DB, let's use the lower limit.
if (startChangeNumber < oldestChangeNumber)
{
- CNIndexRecord oldestRecord = cnIndexDB.getRecord(oldestChangeNumber);
+ cursor = cnIndexDB.getCursorFrom(oldestChangeNumber);
+ final CNIndexRecord oldestRecord = cursor.getRecord();
if (oldestRecord == null)
{
// This should not happen
+ close(cursor);
isEndOfCNIndexDBReached = true;
return null;
}
- final String crossDomainStartState = oldestRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestChangeNumber);
- return crossDomainStartState;
+ cnIndexDBCursor = cursor;
+ return oldestRecord.getPreviousCookie();
}
else if (startChangeNumber <= newestChangeNumber)
{
@@ -653,9 +655,9 @@
return null;
}
- final String crossDomainStartState = newestRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(newestRecord.getChangeNumber());
- return crossDomainStartState;
+ cnIndexDBCursor =
+ getCursorFrom(cnIndexDB, newestRecord.getChangeNumber());
+ return newestRecord.getPreviousCookie();
// TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ...
// this may be very long. Work on perf improvement here.
@@ -665,6 +667,19 @@
throw new DirectoryException(ResultCode.SUCCESS, Message.raw(""));
}
+ private DBCursor<CNIndexRecord> getCursorFrom(ChangeNumberIndexDB cnIndexDB,
+ long startChangeNumber) throws ChangelogException
+ {
+ DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(startChangeNumber);
+ if (cursor.getRecord() == null)
+ {
+ close(cursor);
+ throw new ChangelogException(Message.raw("Change Number "
+ + startChangeNumber + " is not available in the Changelog"));
+ }
+ return cursor;
+ }
+
/**
* Initialize the context for each domain.
* @param providedCookie the provided generalized state
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index 1786e10..f578665 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -51,17 +51,6 @@
long getLastGeneratedChangeNumber();
/**
- * Get the record associated to a provided change number.
- *
- * @param changeNumber
- * the provided change number.
- * @return the {@link CNIndexRecord}, null when none.
- * @throws ChangelogException
- * if a database problem occurs.
- */
- CNIndexRecord getRecord(long changeNumber) throws ChangelogException;
-
- /**
* Get the oldest record stored in this DB.
*
* @return Returns the oldest {@link CNIndexRecord} in this DB, null when the
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index acf81f5..6fb9407 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -32,7 +32,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
@@ -220,24 +219,6 @@
return getNewestRecord() == null;
}
- /**
- * Get a read cursor on the database from a provided key. The cursor MUST be
- * closed after use.
- * <p>
- * This method is only used by unit tests.
- *
- * @param startChangeNumber
- * The change number from where to start.
- * @return the new cursor.
- * @throws ChangelogException
- * if a database problem occurs.
- */
- DraftCNDBCursor getReadCursor(long startChangeNumber)
- throws ChangelogException
- {
- return db.openReadCursor(startChangeNumber);
- }
-
/** {@inheritDoc} */
@Override
public DBCursor<CNIndexRecord> getCursorFrom(long startChangeNumber)
@@ -564,49 +545,4 @@
newestChangeNumber = getChangeNumber(db.readLastRecord());
}
- private ReentrantLock lock = new ReentrantLock();
-
- /**
- * Tests if the current thread has the lock on this object.
- * @return True if the current thread has the lock.
- */
- public boolean hasLock()
- {
- return lock.getHoldCount() > 0;
- }
-
- /**
- * Takes the lock on this object (blocking until lock can be acquired).
- * @throws InterruptedException If interrupted.
- */
- public void lock() throws InterruptedException
- {
- lock.lockInterruptibly();
- }
-
- /**
- * Releases the lock on this object.
- */
- public void release()
- {
- lock.unlock();
- }
-
- /** {@inheritDoc} */
- @Override
- public CNIndexRecord getRecord(long changeNumber)
- throws ChangelogException
- {
- DraftCNDBCursor cursor = null;
- try
- {
- cursor = db.openReadCursor(changeNumber);
- return cursor.currentRecord();
- }
- finally
- {
- close(cursor);
- }
- }
-
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
index 641c7ee..57db6d4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
@@ -27,7 +27,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
@@ -59,11 +58,6 @@
throws ChangelogException
{
draftCNDbCursor = db.openReadCursor(startChangeNumber);
- if (draftCNDbCursor.currentRecord() == null)
- {
- throw new ChangelogException(Message.raw("Change Number "
- + startChangeNumber + " is not available in the Changelog"));
- }
}
/** {@inheritDoc} */
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 1e6b905..0409d34 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -38,7 +38,6 @@
import org.opends.server.replication.server.changelog.api.CNIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.Test;
@@ -96,23 +95,19 @@
assertEquals(oldestCN, cn1);
assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
- DraftCNDBCursor dbc = cnIndexDB.getReadCursor(oldestCN);
+ DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
try
{
- assertEqualTo(dbc.currentRecord(), csns[0], baseDN1, value1);
- assertTrue(dbc.toString().length() != 0);
-
- assertTrue(dbc.next());
- assertEqualTo(dbc.currentRecord(), csns[1], baseDN2, value2);
-
- assertTrue(dbc.next());
- assertEqualTo(dbc.currentRecord(), csns[2], baseDN3, value3);
-
- assertFalse(dbc.next());
+ assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1);
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2);
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3);
+ assertFalse(cursor.next());
}
finally
{
- StaticUtils.close(dbc);
+ StaticUtils.close(cursor);
}
// Now test that the trimming thread does its job => start it
@@ -136,11 +131,11 @@
}
}
- private void assertEqualTo(CNIndexRecord data, CSN csn, DN baseDN, String cookie)
+ private void assertEqualTo(CNIndexRecord record, CSN csn, DN baseDN, String cookie)
{
- assertEquals(data.getCSN(), csn);
- assertEquals(data.getBaseDN(), baseDN);
- assertEquals(data.getPreviousCookie(), cookie);
+ assertEquals(record.getCSN(), csn);
+ assertEquals(record.getBaseDN(), baseDN);
+ assertEquals(record.getPreviousCookie(), cookie);
}
private JEChangeNumberIndexDB newCNIndexDB(ReplicationServer rs) throws Exception
@@ -256,14 +251,14 @@
}
private void assertCursorReadsInOrder(DBCursor<CNIndexRecord> cursor,
- long... sns) throws ChangelogException
+ long... cns) throws ChangelogException
{
try
{
- for (int i = 0; i < sns.length; i++)
+ for (int i = 0; i < cns.length; i++)
{
- assertEquals(cursor.getRecord().getChangeNumber(), sns[i]);
- final boolean isNotLast = i + 1 < sns.length;
+ assertEquals(cursor.getRecord().getChangeNumber(), cns[i]);
+ final boolean isNotLast = i + 1 < cns.length;
assertEquals(cursor.next(), isNotLast);
}
}
--
Gitblit v1.10.0