From caca80367f4fa2843338da673c7def60bb012961 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 04 Sep 2013 12:34:42 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 231 ++++++++++++++++---------------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java | 40 ++++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java | 34 -----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 6
5 files changed, 141 insertions(+), 173 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 168a69d..e0b585d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1347,9 +1347,11 @@
* provided oldestChange, <code>false</code> otherwise
* @throws DirectoryException
* if any problem occur
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
- throws DirectoryException
+ throws DirectoryException, ChangelogException
{
// We also need to check if the draftCNdb is consistent with
// the changelogdb.
@@ -1451,7 +1453,7 @@
}
private void assignNewDraftCNAndStore(ECLUpdateMsg change)
- throws DirectoryException
+ throws DirectoryException, ChangelogException
{
// generate a new change number and assign to this change
change.setChangeNumber(replicationServer.getNewChangeNumber());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 59ce757..90bb23f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -586,8 +586,7 @@
* Enable the ECL access by creating a dedicated workflow element.
* @throws DirectoryException when an error occurs.
*/
- public void enableECL()
- throws DirectoryException
+ public void enableECL() throws DirectoryException
{
if (externalChangeLogWorkflowImpl!=null)
{
@@ -661,8 +660,7 @@
{
Message message =
NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e.toString());
- throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
- message, e);
+ throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e);
}
}
@@ -679,19 +677,14 @@
// internalNetworkGroup?
NetworkGroup internalNetworkGroup = NetworkGroup
.getInternalNetworkGroup();
- internalNetworkGroup
- .deregisterWorkflow(externalChangeLogWorkflowID);
+ internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
// FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
- NetworkGroup adminNetworkGroup = NetworkGroup
- .getAdminNetworkGroup();
- adminNetworkGroup
- .deregisterWorkflow(externalChangeLogWorkflowID);
+ NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
+ adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
- NetworkGroup defaultNetworkGroup = NetworkGroup
- .getDefaultNetworkGroup();
- defaultNetworkGroup
- .deregisterWorkflow(externalChangeLogWorkflowID);
+ NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
+ defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
eclwf.deregister();
eclwf.finalizeWorkflow();
@@ -705,11 +698,26 @@
eclwe.finalizeWorkflowElement();
}
+ shutdownCNIndexDB();
+ }
+
+ private void shutdownCNIndexDB()
+ {
synchronized (cnIndexDBLock)
{
if (cnIndexDB != null)
{
- cnIndexDB.shutdown();
+ try
+ {
+ cnIndexDB.shutdown();
+ }
+ catch (ChangelogException ignored)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
+ }
+ }
}
}
}
@@ -1385,17 +1393,7 @@
}
}
- try
- {
- cnIndexDB.shutdown();
- }
- catch (Exception ignored)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
- }
- }
+ shutdownCNIndexDB();
lastGeneratedChangeNumber = 0;
cnIndexDB = null;
@@ -1639,7 +1637,7 @@
if (cnIndexDB == null)
{
cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
- lastGeneratedChangeNumber = getLastChangeNumber();
+ lastGeneratedChangeNumber = cnIndexDB.getLastChangeNumber();
}
return cnIndexDB;
}
@@ -1654,40 +1652,6 @@
}
/**
- * Get the value of the first change number, 0 when db is empty.
- *
- * @return the first value.
- */
- public long getFirstChangeNumber()
- {
- synchronized (cnIndexDBLock)
- {
- if (cnIndexDB != null)
- {
- return cnIndexDB.getFirstChangeNumber();
- }
- return 0;
- }
- }
-
- /**
- * Get the value of the last change number, 0 when db is empty.
- *
- * @return the last value.
- */
- public long getLastChangeNumber()
- {
- synchronized (cnIndexDBLock)
- {
- if (cnIndexDB != null)
- {
- return cnIndexDB.getLastChangeNumber();
- }
- return 0;
- }
- }
-
- /**
* Generate a new change number.
*
* @return The generated change number
@@ -1739,89 +1703,96 @@
boolean dbEmpty = false;
final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
- long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
- Map<String, ServerState> domainsServerStateForLastCN = null;
- CSN csnForLastCN = null;
- String domainForLastCN = null;
- if (firstChangeNumber < 1)
+ try
{
- dbEmpty = true;
- firstChangeNumber = 0;
- lastChangeNumber = 0;
- }
- else
- {
- lastChangeNumber = cnIndexDB.getLastChangeNumber();
-
- // Get the generalized state associated with the current last change
- // number and initializes from it the startStates table
- String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
- if (lastCNGenState != null && lastCNGenState.length() > 0)
+ long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
+ Map<String, ServerState> domainsServerStateForLastCN = null;
+ CSN csnForLastCN = null;
+ String domainForLastCN = null;
+ if (firstChangeNumber < 1)
{
- domainsServerStateForLastCN =
- MultiDomainServerState.splitGenStateToServerStates(lastCNGenState);
- }
-
- csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
- domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
- }
-
- long newestDate = 0;
- for (ReplicationServerDomain rsd : getReplicationServerDomains())
- {
- if (contains(excludedBaseDNs, rsd.getBaseDn()))
- continue;
-
- // for this domain, have the state in the replchangelog
- // where the last change number update is
- long ec;
- if (domainsServerStateForLastCN == null)
- {
- // Count changes of this domain from the beginning of the changelog
- CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
- ec = rsd.getEligibleCount(
- rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
- crossDomainEligibleCSN);
+ dbEmpty = true;
+ firstChangeNumber = 0;
+ lastChangeNumber = 0;
}
else
{
- // There are records in the draftDB (so already returned to clients)
- // BUT
- // There is nothing related to this domain in the last draft record
- // (may be this domain was disabled when this record was returned).
- // In that case, are counted the changes from
- // the date of the most recent change from this last draft record
- if (newestDate == 0)
+ lastChangeNumber = cnIndexDB.getLastChangeNumber();
+
+ // Get the generalized state associated with the current last change
+ // number and initializes from it the startStates table
+ String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
+ if (lastCNGenState != null && lastCNGenState.length() > 0)
{
- newestDate = csnForLastCN.getTime();
+ domainsServerStateForLastCN = MultiDomainServerState
+ .splitGenStateToServerStates(lastCNGenState);
}
- // And count changes of this domain from the date of the
- // lastseqnum record (that does not refer to this domain)
- CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
- ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
-
- if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn()))
- ec--;
+ csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
+ domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
}
- // cumulates on domains
- lastChangeNumber += ec;
+ long newestDate = 0;
+ for (ReplicationServerDomain rsd : getReplicationServerDomains())
+ {
+ if (contains(excludedBaseDNs, rsd.getBaseDn()))
+ continue;
- // CNIndexDB is empty and there are eligible updates in the replication
- // changelog then init first change number
- if (ec > 0 && firstChangeNumber == 0)
- firstChangeNumber = 1;
+ // for this domain, have the state in the replchangelog
+ // where the last change number update is
+ long ec;
+ if (domainsServerStateForLastCN == null)
+ {
+ // Count changes of this domain from the beginning of the changelog
+ CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
+ ec = rsd.getEligibleCount(
+ rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
+ crossDomainEligibleCSN);
+ }
+ else
+ {
+ // There are records in the draftDB (so already returned to clients)
+ // BUT
+ // There is nothing related to this domain in the last draft record
+ // (may be this domain was disabled when this record was returned).
+ // In that case, are counted the changes from
+ // the date of the most recent change from this last draft record
+ if (newestDate == 0)
+ {
+ newestDate = csnForLastCN.getTime();
+ }
+
+ // And count changes of this domain from the date of the
+ // lastseqnum record (that does not refer to this domain)
+ CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
+ ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
+
+ if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn()))
+ ec--;
+ }
+
+ // cumulates on domains
+ lastChangeNumber += ec;
+
+ // CNIndexDB is empty and there are eligible updates in the replication
+ // changelog then init first change number
+ if (ec > 0 && firstChangeNumber == 0)
+ firstChangeNumber = 1;
+ }
+
+ if (dbEmpty)
+ {
+ // The database was empty, just keep increasing numbers since last time
+ // we generated one change number.
+ firstChangeNumber += lastGeneratedChangeNumber;
+ lastChangeNumber += lastGeneratedChangeNumber;
+ }
+ return new long[] { firstChangeNumber, lastChangeNumber };
}
-
- if (dbEmpty)
+ catch (ChangelogException e)
{
- // The database was empty, just keep increasing numbers since last time
- // we generated one change number.
- firstChangeNumber += lastGeneratedChangeNumber;
- lastChangeNumber += lastGeneratedChangeNumber;
+ throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
}
- return new long[] { firstChangeNumber, lastChangeNumber };
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index 57faad5..ccd70e2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -49,8 +49,10 @@
* @param changeNumber
* the provided change number.
* @return the associated CSN, null when none.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- public CSN getCSN(long changeNumber);
+ public CSN getCSN(long changeNumber) throws ChangelogException;
/**
* Get the baseDN associated to a provided change number.
@@ -58,8 +60,10 @@
* @param changeNumber
* the provided change number.
* @return the baseDN, null when none.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- public String getBaseDN(long changeNumber);
+ public String getBaseDN(long changeNumber) throws ChangelogException;
/**
* Get the previous cookie associated to a provided change number.
@@ -67,22 +71,28 @@
* @param changeNumber
* the provided change number.
* @return the previous cookie, null when none.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- String getPreviousCookie(long changeNumber);
+ String getPreviousCookie(long changeNumber) throws ChangelogException;
/**
* Get the first change number stored in this DB.
*
* @return Returns the first change number in this DB.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- long getFirstChangeNumber();
+ long getFirstChangeNumber() throws ChangelogException;
/**
* Get the last change number stored in this DB.
*
* @return Returns the last change number in this DB
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- long getLastChangeNumber();
+ long getLastChangeNumber() throws ChangelogException;
/**
* Add an update to the list of messages that must be saved to this DB managed
@@ -99,8 +109,11 @@
* The associated baseDN.
* @param csn
* The associated replication CSN.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- void add(long changeNumber, String previousCookie, String baseDN, CSN csn);
+ void add(long changeNumber, String previousCookie, String baseDN, CSN csn)
+ throws ChangelogException;
/**
* Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
@@ -113,7 +126,7 @@
* this DBHandler and starting at the position defined by a given
* changeNumber.
* @throws ChangelogException
- * if a database problem happened.
+ * if a database problem occurs.
*/
ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
throws ChangelogException;
@@ -123,14 +136,16 @@
*
* @return <code>true</code> if this database is empty, <code>false</code>
* otherwise
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- boolean isEmpty();
+ boolean isEmpty() throws ChangelogException;
/**
* Clear the changes from this DB (from both memory cache and DB storage).
*
* @throws ChangelogException
- * When an exception occurs while removing the changes from this DB.
+ * if a database problem occurs.
*/
void clear() throws ChangelogException;
@@ -142,13 +157,16 @@
* The baseDN for which we want to remove all records from this DB,
* null means all.
* @throws ChangelogException
- * When an exception occurs while removing the changes from this DB.
+ * if a database problem occurs.
*/
void clear(String baseDNToClear) throws ChangelogException;
/**
* Shutdown this DB.
+ *
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- void shutdown();
+ void shutdown() throws ChangelogException;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
index c3a0c31..655de4a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
@@ -33,6 +33,9 @@
/**
* This cursor allows to iterate through the changes received from a given
* replica (Directory Server) in the topology.
+ * <p>
+ * Instances of this class are sorted in the order defined by the CSN of the
+ * current {@link UpdateMsg}, i.e. the cursor with the oldest CSN comes first.
*/
public interface ReplicaDBCursor extends Closeable, Comparable<ReplicaDBCursor>
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
index 3d57e71..7d1ce8b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -516,7 +516,7 @@
/** {@inheritDoc} */
@Override
- public String getPreviousCookie(long changeNumber)
+ public String getPreviousCookie(long changeNumber) throws ChangelogException
{
DraftCNDBCursor cursor = null;
try
@@ -524,11 +524,6 @@
cursor = db.openReadCursor(changeNumber);
return cursor.currentValue();
}
- catch(Exception e)
- {
- debugException("getValue", changeNumber, e);
- return null;
- }
finally
{
close(cursor);
@@ -537,7 +532,7 @@
/** {@inheritDoc} */
@Override
- public CSN getCSN(long changeNumber)
+ public CSN getCSN(long changeNumber) throws ChangelogException
{
DraftCNDBCursor cursor = null;
try
@@ -545,20 +540,15 @@
cursor = db.openReadCursor(changeNumber);
return cursor.currentCSN();
}
- catch(Exception e)
- {
- debugException("getCSN", changeNumber, e);
- return null;
- }
finally
{
close(cursor);
}
}
- /**{@inheritDoc}*/
+ /** {@inheritDoc} */
@Override
- public String getBaseDN(long changeNumber)
+ public String getBaseDN(long changeNumber) throws ChangelogException
{
DraftCNDBCursor cursor = null;
try
@@ -566,25 +556,9 @@
cursor = db.openReadCursor(changeNumber);
return cursor.currentBaseDN();
}
- catch(Exception e)
- {
- debugException("getBaseDN", changeNumber, e);
- return null;
- }
finally
{
close(cursor);
}
}
-
- private void debugException(String methodName, long changeNumber, Exception e)
- {
- if (debugEnabled())
- TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: "
- + " key=" + changeNumber + " value returned is null"
- + " first="+ db.readFirstChangeNumber()
- + " last=" + db.readLastChangeNumber()
- + " count=" + db.count()
- + " exception " + e + " " + e.getMessage());
- }
}
--
Gitblit v1.10.0