From 0d570fd6e60caa08e90e944857ba2d97ccee998b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Sep 2013 07:51:54 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 30 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java | 107 ++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java | 16
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java | 55 +--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/CNIndexData.java | 113 +++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java | 183 +++--------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java | 137 +++++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java | 29 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 97 +++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java | 22 -
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java | 105 ++++--
11 files changed, 444 insertions(+), 450 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index e0b585d..ee5f785 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,7 +41,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
@@ -71,7 +74,7 @@
/**
* Specifies the last changer number requested.
*/
- private int lastChangeNumber = 0;
+ private long lastChangeNumber = 0;
/**
* Specifies whether the change number db has been read until its end.
*/
@@ -522,7 +525,7 @@
* @throws DirectoryException
* When an error is raised.
*/
- private void initializeCLSearchFromChangeNumber(int startChangeNumber)
+ private void initializeCLSearchFromChangeNumber(long startChangeNumber)
throws DirectoryException
{
try
@@ -535,13 +538,13 @@
catch(DirectoryException de)
{
TRACER.debugCaught(DebugLogLevel.ERROR, de);
- releaseIterator();
+ releaseCursor();
throw de;
}
catch(Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- releaseIterator();
+ releaseCursor();
throw new DirectoryException(
ResultCode.OPERATIONS_ERROR,
Message.raw(Category.SYNC,
@@ -561,9 +564,8 @@
* @throws DirectoryException
* if a database problem occurred
*/
- private String findCookie(final int startChangeNumber)
- throws ChangelogException,
- DirectoryException
+ private String findCookie(final long startChangeNumber)
+ throws ChangelogException, DirectoryException
{
final ChangeNumberIndexDB cnIndexDB =
replicationServer.getChangeNumberIndexDB();
@@ -581,9 +583,9 @@
return null;
}
- final long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
- final String crossDomainStartState =
- cnIndexDB.getPreviousCookie(firstChangeNumber);
+ final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData();
+ final long firstChangeNumber = firstCNData.getChangeNumber();
+ final String crossDomainStartState = firstCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
return crossDomainStartState;
}
@@ -591,11 +593,11 @@
// Request filter DOES contain a startChangeNumber
// Read the draftCNDb to see whether it contains startChangeNumber
- String crossDomainStartState =
- cnIndexDB.getPreviousCookie(startChangeNumber);
- if (crossDomainStartState != null)
+ CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber);
+ if (startCNData != null)
{
// found the provided startChangeNumber, let's return it
+ final String crossDomainStartState = startCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
return crossDomainStartState;
}
@@ -615,9 +617,10 @@
// the DB, let's use the lower limit.
if (startChangeNumber < firstChangeNumber)
{
- crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber);
- if (crossDomainStartState != null)
+ CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber);
+ if (firstCNData != null)
{
+ final String crossDomainStartState = firstCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
return crossDomainStartState;
}
@@ -636,8 +639,9 @@
return null;
}
- final long lastKey = cnIndexDB.getLastChangeNumber();
- crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
+ final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData();
+ final long lastKey = lastCNData.getChangeNumber();
+ final String crossDomainStartState = lastCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
return crossDomainStartState;
@@ -897,7 +901,7 @@
{
if (debugEnabled())
TRACER.debugInfo(this + " shutdown()");
- releaseIterator();
+ releaseCursor();
for (DomainContext domainCtxt : domainCtxts) {
if (!domainCtxt.unRegisterHandler()) {
logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -910,7 +914,7 @@
domainCtxts = null;
}
- private void releaseIterator()
+ private void releaseCursor()
{
if (this.cnIndexDBCursor != null)
{
@@ -1256,13 +1260,10 @@
oldestContext.currentState.update(
change.getUpdateMsg().getCSN());
- if (oldestContext.currentState.cover(oldestContext.stopState))
- {
- oldestContext.active = false;
- }
- if (draftCompat
- && lastChangeNumber > 0
- && change.getChangeNumber() > lastChangeNumber)
+ if (oldestContext.currentState.cover(oldestContext.stopState)
+ || (draftCompat
+ && lastChangeNumber > 0
+ && change.getChangeNumber() > lastChangeNumber))
{
oldestContext.active = false;
}
@@ -1278,8 +1279,9 @@
if (searchPhase == PERSISTENT_PHASE)
{
if (debugEnabled())
- clDomCtxtsToString("In getNextECLUpdate (persistent): " +
- "looking for the generalized oldest change");
+ TRACER.debugInfo(clDomCtxtsToString(
+ "In getNextECLUpdate (persistent): "
+ + "looking for the generalized oldest change"));
for (DomainContext domainCtxt : domainCtxts) {
domainCtxt.getNextEligibleMessageForDomain(operationId);
@@ -1300,7 +1302,7 @@
if (draftCompat)
{
- assignNewDraftCNAndStore(change);
+ assignNewChangeNumberAndStore(change);
}
oldestChange = change;
}
@@ -1317,21 +1319,19 @@
if (oldestChange != null)
{
+ final CSN csn = oldestChange.getUpdateMsg().getCSN();
if (debugEnabled())
- TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
- + oldestChange.getUpdateMsg().getCSN());
+ TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
// Update the current state
- previousCookie.update(
- oldestChange.getBaseDN(),
- oldestChange.getUpdateMsg().getCSN());
+ previousCookie.update(oldestChange.getBaseDN(), csn);
// Set the current value of global state in the returned message
oldestChange.setCookie(previousCookie);
if (debugEnabled())
- TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
- oldestChange);
+ TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
+ + oldestChange);
}
return oldestChange;
@@ -1370,14 +1370,15 @@
if (isEndOfCNIndexDBReached)
{
// we are at the end of the DraftCNdb in the append mode
- assignNewDraftCNAndStore(oldestChange);
+ assignNewChangeNumberAndStore(oldestChange);
return true;
}
// the next change from the CNIndexDB
- CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN();
- String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN();
+ final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData();
+ final CSN csnFromDraftCNDb = cnIndexData.getCSN();
+ final String dnFromDraftCNDb = cnIndexData.getBaseDN();
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() generating change number "
@@ -1392,10 +1393,10 @@
{
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() generating change number "
- + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber()
+ + " assigning changeNumber=" + cnIndexData.getChangeNumber()
+ " to change=" + oldestChange);
- oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber());
+ oldestChange.setChangeNumber(cnIndexData.getChangeNumber());
return true;
}
@@ -1429,8 +1430,8 @@
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() generating change number has"
- + "skipped to changeNumber=" + cnIndexDBCursor.getChangeNumber()
- + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?"
+ + "skipped to changeNumber=" + cnIndexData.getChangeNumber()
+ + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?"
+ isEndOfCNIndexDBReached);
}
catch (ChangelogException e)
@@ -1452,7 +1453,7 @@
return sameDN && sameCSN;
}
- private void assignNewDraftCNAndStore(ECLUpdateMsg change)
+ private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
throws DirectoryException, ChangelogException
{
// generate a new change number and assign to this change
@@ -1460,11 +1461,11 @@
// store in CNIndexDB the pair
// (change number of the current change, state before this change)
- replicationServer.getChangeNumberIndexDB().add(
+ replicationServer.getChangeNumberIndexDB().add(new CNIndexData(
change.getChangeNumber(),
previousCookie.toString(),
change.getBaseDN(),
- change.getUpdateMsg().getCSN());
+ change.getUpdateMsg().getCSN()));
}
/**
@@ -1499,7 +1500,7 @@
}
// End of INIT_PHASE => always release the iterator
- releaseIterator();
+ releaseCursor();
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 90bb23f..11f6507 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,6 +52,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DbHandler;
@@ -1699,37 +1700,36 @@
* (this diff is done domain by domain)
*/
- long lastChangeNumber;
- boolean dbEmpty = false;
final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
-
try
{
- long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
+ boolean dbEmpty = true;
+ long firstChangeNumber = 0;
+ long lastChangeNumber = 0;
+
+ final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData();
+ final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData();
+
Map<String, ServerState> domainsServerStateForLastCN = null;
CSN csnForLastCN = null;
String domainForLastCN = null;
- if (firstChangeNumber < 1)
+ if (firstCNData != null)
{
- dbEmpty = true;
- firstChangeNumber = 0;
- lastChangeNumber = 0;
- }
- else
- {
- lastChangeNumber = cnIndexDB.getLastChangeNumber();
+ dbEmpty = false;
+ firstChangeNumber = firstCNData.getChangeNumber();
+ lastChangeNumber = lastCNData.getChangeNumber();
// Get the generalized state associated with the current last change
// number and initializes from it the startStates table
- String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
+ String lastCNGenState = lastCNData.getPreviousCookie();
if (lastCNGenState != null && lastCNGenState.length() > 0)
{
domainsServerStateForLastCN = MultiDomainServerState
.splitGenStateToServerStates(lastCNGenState);
}
- csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
- domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
+ csnForLastCN = lastCNData.getCSN();
+ domainForLastCN = lastCNData.getBaseDN();
}
long newestDate = 0;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/CNIndexData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/CNIndexData.java
new file mode 100644
index 0000000..9a4348e
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/CNIndexData.java
@@ -0,0 +1,113 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import org.opends.server.replication.common.CSN;
+
+/**
+ * The Change Number Index Data class represents records stored in the
+ * {@link ChangeNumberIndexDB}.
+ */
+public class CNIndexData
+{
+
+ /** This is the key used to store the rest of the . */
+ private long changeNumber;
+ private String previousCookie;
+ private String baseDN;
+ private CSN csn;
+
+ /**
+ * Builds an instance of this class.
+ *
+ * @param changeNumber
+ * the change number
+ * @param previousCookie
+ * the previous cookie
+ * @param baseDN
+ * the baseDN
+ * @param csn
+ * the replication CSN field
+ */
+ public CNIndexData(long changeNumber, String previousCookie, String baseDN,
+ CSN csn)
+ {
+ super();
+ this.changeNumber = changeNumber;
+ this.previousCookie = previousCookie;
+ this.baseDN = baseDN;
+ this.csn = csn;
+ }
+
+ /**
+ * Getter for the baseDN field.
+ *
+ * @return the baseDN
+ */
+ public String getBaseDN()
+ {
+ return baseDN;
+ }
+
+ /**
+ * Getter for the replication CSN field.
+ *
+ * @return The replication CSN field.
+ */
+ public CSN getCSN()
+ {
+ return csn;
+ }
+
+ /**
+ * Getter for the change number field.
+ *
+ * @return The change number field.
+ */
+ public long getChangeNumber()
+ {
+ return changeNumber;
+ }
+
+ /**
+ * Get the previous cookie field.
+ *
+ * @return the previous cookie.
+ */
+ public String getPreviousCookie()
+ {
+ return previousCookie;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN
+ + " previousCookie=" + previousCookie;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index ccd70e2..ab5abb1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -26,13 +26,12 @@
*/
package org.opends.server.replication.server.changelog.api;
-import org.opends.server.replication.common.CSN;
-
/**
- * This class stores an index of all the changes seen by this server. The index
- * is sorted by a global ordering as defined in the CSN class. The index links a
- * <code>changeNumber</code> to the corresponding {@link CSN}. The {@link CSN}
- * then links to a corresponding change in one of the {@link ReplicaDB}s.
+ * This class stores an index of all the changes seen by this server in the form
+ * of {@link CNIndexData}. The index is sorted by a global ordering as defined
+ * in the CSN class. The index links a <code>changeNumber</code> to the
+ * corresponding CSN. The CSN then links to a corresponding change in one of the
+ * ReplicaDBs.
*
* @see <a href=
* "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -44,46 +43,33 @@
{
/**
- * Get the CSN associated to a provided change number.
+ * Get the {@link CNIndexData} record associated to a provided change number.
*
* @param changeNumber
* the provided change number.
- * @return the associated CSN, null when none.
+ * @return the {@link CNIndexData} record, null when none.
* @throws ChangelogException
* if a database problem occurs.
*/
- public CSN getCSN(long changeNumber) throws ChangelogException;
+ CNIndexData getCNIndexData(long changeNumber) throws ChangelogException;
/**
- * Get the baseDN associated to a provided change number.
+ * Get the first {@link CNIndexData} record stored in this DB.
*
- * @param changeNumber
- * the provided change number.
- * @return the baseDN, null when none.
+ * @return Returns the first {@link CNIndexData} record in this DB.
* @throws ChangelogException
* if a database problem occurs.
*/
- public String getBaseDN(long changeNumber) throws ChangelogException;
+ CNIndexData getFirstCNIndexData() throws ChangelogException;
/**
- * Get the previous cookie associated to a provided change number.
+ * Get the last {@link CNIndexData} record stored in this DB.
*
- * @param changeNumber
- * the provided change number.
- * @return the previous cookie, null when none.
+ * @return Returns the last {@link CNIndexData} record in this DB
* @throws ChangelogException
* if a database problem occurs.
*/
- String getPreviousCookie(long changeNumber) throws ChangelogException;
-
- /**
- * Get the first change number stored in this DB.
- *
- * @return Returns the first change number in this DB.
- * @throws ChangelogException
- * if a database problem occurs.
- */
- long getFirstChangeNumber() throws ChangelogException;
+ CNIndexData getLastCNIndexData() throws ChangelogException;
/**
* Get the last change number stored in this DB.
@@ -101,19 +87,12 @@
* This method is blocking if the size of the list of message is larger than
* its maximum.
*
- * @param changeNumber
- * The change number for this record in this DB.
- * @param previousCookie
- * The value of the previous cookie.
- * @param baseDN
- * The associated baseDN.
- * @param csn
- * The associated replication CSN.
+ * @param cnIndexData
+ * The {@link CNIndexData} record to add to this DB.
* @throws ChangelogException
* if a database problem occurs.
*/
- void add(long changeNumber, String previousCookie, String baseDN, CSN csn)
- throws ChangelogException;
+ void add(CNIndexData cnIndexData) throws ChangelogException;
/**
* Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
index dcbbd8a..a5a02a3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
@@ -28,8 +28,6 @@
import java.io.Closeable;
-import org.opends.server.replication.common.CSN;
-
/**
* Iterator into the changelog database. Once it is not used anymore, a
* ChangelogDBIterator must be closed to release all the resources into the
@@ -39,25 +37,11 @@
{
/**
- * Getter for the replication CSN field.
+ * Getter for the {@link CNIndexData} record.
*
- * @return The replication CSN field.
+ * @return The replication CNIndexData record.
*/
- CSN getCSN();
-
- /**
- * Getter for the baseDN field.
- *
- * @return The service ID.
- */
- String getBaseDN();
-
- /**
- * Getter for the change number field.
- *
- * @return The change number field.
- */
- long getChangeNumber();
+ CNIndexData getCNIndexData();
/**
* Skip to the next record of the database.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
index e95fa04..7319e1e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -34,7 +34,7 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DebugLogLevel;
@@ -56,7 +56,6 @@
public class DraftCNDB
{
private static final DebugTracer TRACER = getTracer();
- private static final int DATABASE_EMPTY = 0;
private Database db;
private ReplicationDbEnv dbenv;
@@ -83,22 +82,19 @@
/**
* Add an entry to the database.
- * @param changeNumber the provided change number.
*
- * @param value the provided value to be stored associated
- * with this change number.
- * @param domainBaseDN the provided domainBaseDn to be stored associated
- * with this change number.
- * @param csn the provided replication CSN to be
- * stored associated with this change number.
+ * @param cnIndexData
+ * the provided {@link CNIndexData} to be stored.
*/
- public void addEntry(long changeNumber, String value, String domainBaseDN,
- CSN csn)
+ public void addEntry(CNIndexData cnIndexData)
{
try
{
+ final long changeNumber = cnIndexData.getChangeNumber();
DatabaseEntry key = new ReplicationDraftCNKey(changeNumber);
- DatabaseEntry data = new DraftCNData(value, domainBaseDN, csn);
+ DatabaseEntry data =
+ new DraftCNData(changeNumber, cnIndexData.getPreviousCookie(),
+ cnIndexData.getBaseDN(), cnIndexData.getCSN());
// Use a transaction so that we can override durability.
Transaction txn = null;
@@ -220,9 +216,12 @@
/**
* Read the first Change from the database, 0 when none.
+ *
* @return the first change number.
+ * @throws ChangelogException
+ * if a database problem occurred
*/
- public int readFirstChangeNumber()
+ public CNIndexData readFirstCNIndexData() throws ChangelogException
{
try
{
@@ -233,18 +232,18 @@
// If the DB has been closed then return immediately.
if (isDBClosed())
{
- return DATABASE_EMPTY;
+ return null;
}
cursor = db.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
+ ReplicationDraftCNKey key = new ReplicationDraftCNKey();
DatabaseEntry entry = new DatabaseEntry();
if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS)
{
- return DATABASE_EMPTY;
+ return null;
}
- return Integer.parseInt(decodeUTF8(key.getData()));
+ return newCNIndexData(key, entry);
}
finally
{
@@ -254,10 +253,17 @@
catch (DatabaseException e)
{
dbenv.shutdownOnException(e);
- return DATABASE_EMPTY;
+ return null;
}
}
+ private CNIndexData newCNIndexData(ReplicationDraftCNKey key,
+ DatabaseEntry data) throws ChangelogException
+ {
+ return new DraftCNData(key.getChangeNumber(), data.getData())
+ .getCNIndexData();
+ }
+
/**
* Return the record count.
* @return the record count.
@@ -270,7 +276,7 @@
// If the DB has been closed then return immediately.
if (isDBClosed())
{
- return DATABASE_EMPTY;
+ return 0;
}
return db.count();
@@ -283,14 +289,17 @@
{
dbCloseLock.readLock().unlock();
}
- return DATABASE_EMPTY;
+ return 0;
}
/**
* Read the last change number from the database.
+ *
* @return the last change number.
+ * @throws ChangelogException
+ * if a database problem occurred
*/
- public int readLastChangeNumber()
+ public CNIndexData readLastCNIndexData() throws ChangelogException
{
try
{
@@ -301,18 +310,18 @@
// If the DB has been closed then return immediately.
if (isDBClosed())
{
- return DATABASE_EMPTY;
+ return null;
}
cursor = db.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
+ ReplicationDraftCNKey key = new ReplicationDraftCNKey();
DatabaseEntry entry = new DatabaseEntry();
if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS)
{
- return DATABASE_EMPTY;
+ return null;
}
- return Integer.parseInt(decodeUTF8(key.getData()));
+ return newCNIndexData(key, entry);
}
finally
{
@@ -322,7 +331,7 @@
catch (DatabaseException e)
{
dbenv.shutdownOnException(e);
- return DATABASE_EMPTY;
+ return null;
}
}
@@ -348,9 +357,9 @@
* Will be set non null for a write cursor.
*/
private final Transaction txn;
- private final DatabaseEntry key;
- private final DatabaseEntry entry;
- private DraftCNData cnData;
+ private final ReplicationDraftCNKey key;
+ private final DatabaseEntry entry = new DatabaseEntry();
+ private CNIndexData cnIndexData;
private boolean isClosed = false;
@@ -365,7 +374,6 @@
private DraftCNDBCursor(long startChangeNumber) throws ChangelogException
{
this.key = new ReplicationDraftCNKey(startChangeNumber);
- this.entry = new DatabaseEntry();
// Take the lock. From now on, whatever error that happen in the life
// of this cursor should end by unlocking that lock. We must also
@@ -405,12 +413,12 @@
}
else
{
- cnData = new DraftCNData(entry.getData());
+ cnIndexData = newCNIndexData(this.key, entry);
}
}
else
{
- cnData = new DraftCNData(entry.getData());
+ cnIndexData = newCNIndexData(this.key, entry);
}
}
@@ -431,15 +439,12 @@
}
}
-
-
private DraftCNDBCursor() throws ChangelogException
{
Transaction localTxn = null;
Cursor localCursor = null;
- this.key = new DatabaseEntry();
- this.entry = new DatabaseEntry();
+ this.key = new ReplicationDraftCNKey();
// We'll go on only if no close or no clear is running
dbCloseLock.readLock().lock();
@@ -544,105 +549,29 @@
}
/**
- * Getter for the value field of the current cursor.
- * @return The current value field.
- */
- public String currentValue()
- {
- if (isClosed)
- {
- return null;
- }
-
- try
- {
- if (cnData != null)
- {
- return cnData.getValue();
- }
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- return null;
- }
-
- /**
- * Getter for the baseDN field of the current cursor.
- * @return The current baseDN.
- */
- public String currentBaseDN()
- {
- if (isClosed)
- {
- return null;
- }
-
- try
- {
- if (cnData != null)
- {
- return cnData.getBaseDN();
- }
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- return null;
- }
-
- /**
- * Getter for the integer value of the current cursor, representing
- * the current change number being processed.
+ * Returns the {@link CNIndexData} at the current position of the cursor.
*
- * @return the current change number as an integer.
+ * @return The current {@link CNIndexData}.
*/
- public int currentKey()
+ public CNIndexData currentData()
{
if (isClosed)
{
- return -1;
+ return null;
}
try
{
- String str = decodeUTF8(key.getData());
- return Integer.parseInt(str);
+ return cnIndexData;
}
catch (Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- return -1;
- }
-
- /**
- * Returns the replication CSN associated with the current key.
- * @return the replication CSN
- */
- public CSN currentCSN()
- {
- if (isClosed)
- {
return null;
}
-
- try
- {
- if (cnData != null)
- {
- return cnData.getCSN();
- }
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- return null;
}
+
/**
* Go to the next record on the cursor.
* @return the next record on this cursor.
@@ -659,10 +588,10 @@
OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
if (status != OperationStatus.SUCCESS)
{
- cnData = null;
+ cnIndexData = null;
return false;
}
- cnData = new DraftCNData(entry.getData());
+ cnIndexData = newCNIndexData(this.key, entry);
}
catch(Exception e)
{
@@ -693,20 +622,6 @@
}
}
- /**
- * Returns the current key associated with this cursor.
- *
- * @return The current key associated with this cursor.
- */
- public DatabaseEntry getKey()
- {
- if (isClosed)
- {
- return null;
- }
-
- return key;
- }
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
index 205c5de..8335497 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
@@ -31,6 +31,7 @@
import org.opends.messages.Message;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import com.sleepycat.je.DatabaseEntry;
@@ -46,18 +47,25 @@
private static final long serialVersionUID = 1L;
- private String value;
- private String baseDN;
- private CSN csn;
+ private long changeNumber;
+ private CNIndexData cnIndexData;
/**
* Creates a record to be stored in the DraftCNDB.
- * @param previousCookie The previous cookie.
- * @param baseDN The baseDN (domain DN).
- * @param csn The replication CSN.
+ *
+ * @param changeNumber
+ * the change number
+ * @param previousCookie
+ * The previous cookie
+ * @param baseDN
+ * The baseDN (domain DN)
+ * @param csn
+ * The replication CSN
*/
- public DraftCNData(String previousCookie, String baseDN, CSN csn)
+ public DraftCNData(long changeNumber, String previousCookie, String baseDN,
+ CSN csn)
{
+ this.changeNumber = changeNumber;
String record =
previousCookie + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn;
setData(getBytes(record));
@@ -65,29 +73,38 @@
/**
* Creates a record to be stored in the DraftCNDB from the provided byte[].
- * @param data the provided byte[].
- * @throws ChangelogException a.
+ *
+ * @param changeNumber
+ * the change number
+ * @param data
+ * the provided byte[]
+ * @throws ChangelogException
+ * if a database problem occurred
*/
- public DraftCNData(byte[] data) throws ChangelogException
+ public DraftCNData(long changeNumber, byte[] data) throws ChangelogException
{
- decodeData(data);
+ this.changeNumber = changeNumber;
+ this.cnIndexData = decodeData(changeNumber, data);
}
/**
- * Decode a record into fields.
- * @param data the provided byte array.
- * @throws ChangelogException when a problem occurs.
+ * Decode and returns a {@link CNIndexData} record.
+ *
+ * @param changeNumber
+ * @param data
+ * the provided byte array.
+ * @return the decoded {@link CNIndexData} record
+ * @throws ChangelogException
+ * when a problem occurs.
*/
- public void decodeData(byte[] data) throws ChangelogException
+ private CNIndexData decodeData(long changeNumber, byte[] data)
+ throws ChangelogException
{
try
{
String stringData = new String(data, "UTF-8");
-
String[] str = stringData.split(FIELD_SEPARATOR, 3);
- value = str[0];
- baseDN = str[1];
- csn = new CSN(str[2]);
+ return new CNIndexData(changeNumber, str[0], str[1], new CSN(str[2]));
}
catch (UnsupportedEncodingException e)
{
@@ -98,43 +115,17 @@
}
/**
- * Getter for the value.
+ * Getter for the decoded {@link CNIndexData} record.
*
- * @return the value.
- * @throws ChangelogException when a problem occurs.
- */
- public String getValue() throws ChangelogException
- {
- if (value == null)
- decodeData(getData());
- return this.value;
- }
-
- /**
- * Getter for the service ID.
- *
- * @return The baseDN
- * @throws ChangelogException when a problem occurs.
- */
- public String getBaseDN() throws ChangelogException
- {
- if (value == null)
- decodeData(getData());
- return this.baseDN;
- }
-
- /**
- * Getter for the replication CSN.
- *
- * @return the replication CSN.
+ * @return the CNIndexData record.
* @throws ChangelogException
* when a problem occurs.
*/
- public CSN getCSN() throws ChangelogException
+ public CNIndexData getCNIndexData() throws ChangelogException
{
- if (value == null)
- decodeData(getData());
- return this.csn;
+ if (cnIndexData == null)
+ cnIndexData = decodeData(changeNumber, getData());
+ return cnIndexData;
}
/**
@@ -144,19 +135,7 @@
@Override
public String toString()
{
- StringBuilder buffer = new StringBuilder();
- toString(buffer);
- return buffer.toString();
+ return "DraftCNData : [" + cnIndexData + "]";
}
- /**
- * Dump a string representation of these data into the provided buffer.
- * @param buffer the provided buffer.
- */
- public void toString(StringBuilder buffer)
- {
- buffer.append("DraftCNData : [value=").append(value);
- buffer.append("] [serviceID=").append(baseDN);
- buffer.append("] [csn=").append(csn).append("]");
- }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
index 7d1ce8b..5be5a8d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -48,6 +48,7 @@
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
@@ -78,12 +79,12 @@
* FIXME Is this field that useful? {@link #getFirstChangeNumber()} does not
* even use it!
*/
- private int firstChangeNumber = NO_KEY;
+ private long firstChangeNumber = NO_KEY;
/**
* FIXME Is this field that useful? {@link #getLastChangeNumber()} does not
* even use it!
*/
- private int lastChangeNumber = NO_KEY;
+ private long lastChangeNumber = NO_KEY;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private boolean shutdown = false;
private boolean trimDone = false;
@@ -122,8 +123,8 @@
// DB initialization
db = new DraftCNDB(dbenv);
- firstChangeNumber = db.readFirstChangeNumber();
- lastChangeNumber = db.readLastChangeNumber();
+ firstChangeNumber = getChangeNumber(db.readFirstCNIndexData());
+ lastChangeNumber = getChangeNumber(db.readLastCNIndexData());
// Trimming thread
thread = new DirectoryThread(this, "Replication DraftCN db");
@@ -134,34 +135,50 @@
DirectoryServer.registerMonitorProvider(dbMonitor);
}
+ private long getChangeNumber(CNIndexData cnIndexData)
+ throws ChangelogException
+ {
+ if (cnIndexData != null)
+ {
+ return cnIndexData.getChangeNumber();
+ }
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override
- public synchronized void add(long changeNumber, String previousCookie,
- String baseDN, CSN csn)
+ public void add(CNIndexData cnIndexData) throws ChangelogException
{
- db.addEntry(changeNumber, previousCookie, baseDN, csn);
+ db.addEntry(cnIndexData);
if (debugEnabled())
- TRACER.debugInfo(
- "In DraftCNDbhandler.add, added: "
- + " key=" + changeNumber
- + " previousCookie=" + previousCookie
- + " baseDN=" + baseDN
- + " csn=" + csn);
+ TRACER.debugInfo("In DraftCNDbhandler.add, added: " + cnIndexData);
}
/** {@inheritDoc} */
@Override
- public long getFirstChangeNumber()
+ public CNIndexData getFirstCNIndexData() throws ChangelogException
{
- return db.readFirstChangeNumber();
+ return db.readFirstCNIndexData();
}
/** {@inheritDoc} */
@Override
- public long getLastChangeNumber()
+ public CNIndexData getLastCNIndexData() throws ChangelogException
{
- return db.readLastChangeNumber();
+ return db.readLastCNIndexData();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getLastChangeNumber() throws ChangelogException
+ {
+ final CNIndexData data = getLastCNIndexData();
+ if (data != null)
+ {
+ return data.getChangeNumber();
+ }
+ return 0;
}
/**
@@ -331,8 +348,8 @@
}
// From the draftCNDb change record, get the domain and CSN
- final CSN csn = cursor.currentCSN();
- final String baseDN = cursor.currentBaseDN();
+ final CNIndexData data = cursor.currentData();
+ final String baseDN = data.getBaseDN();
if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN))
{
cursor.delete();
@@ -350,10 +367,11 @@
continue;
}
+ final CSN csn = data.getCSN();
final ServerState startState = domain.getStartState();
final CSN fcsn = startState.getCSN(csn.getServerId());
- final int currentChangeNumber = cursor.currentKey();
+ final long currentChangeNumber = data.getChangeNumber();
if (csn.older(fcsn))
{
@@ -366,7 +384,7 @@
{
Map<String, ServerState> csnStartStates =
MultiDomainServerState.splitGenStateToServerStates(
- cursor.currentValue());
+ data.getPreviousCookie());
csnVector = csnStartStates.get(baseDN);
if (debugEnabled())
@@ -430,12 +448,37 @@
public List<Attribute> getMonitorData()
{
List<Attribute> attributes = new ArrayList<Attribute>();
- attributes.add(Attributes.create("first-draft-changenumber",
- Integer.toString(db.readFirstChangeNumber())));
- attributes.add(Attributes.create("last-draft-changenumber",
- Integer.toString(db.readLastChangeNumber())));
- attributes.add(Attributes.create("count",
- Long.toString(count())));
+
+ try
+ {
+ CNIndexData firstCNData = db.readFirstCNIndexData();
+ String firstCN = String.valueOf(firstCNData.getChangeNumber());
+ attributes.add(Attributes.create("first-draft-changenumber", firstCN));
+ }
+ catch (ChangelogException e)
+ {
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ attributes.add(Attributes.create("first-draft-changenumber", "0"));
+ }
+
+ try
+ {
+ CNIndexData lastCNData = db.readLastCNIndexData();
+ if (lastCNData != null)
+ {
+ String lastCN = String.valueOf(lastCNData.getChangeNumber());
+ attributes.add(Attributes.create("last-draft-changenumber", lastCN));
+ }
+ }
+ catch (ChangelogException e)
+ {
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ attributes.add(Attributes.create("last-draft-changenumber", "0"));
+ }
+
+ attributes.add(Attributes.create("count", Long.toString(count())));
return attributes;
}
@@ -482,8 +525,8 @@
public void clear() throws ChangelogException
{
db.clear();
- firstChangeNumber = db.readFirstChangeNumber();
- lastChangeNumber = db.readLastChangeNumber();
+ firstChangeNumber = getChangeNumber(db.readFirstCNIndexData());
+ lastChangeNumber = getChangeNumber(db.readLastCNIndexData());
}
private ReentrantLock lock = new ReentrantLock();
@@ -516,13 +559,14 @@
/** {@inheritDoc} */
@Override
- public String getPreviousCookie(long changeNumber) throws ChangelogException
+ public CNIndexData getCNIndexData(long changeNumber)
+ throws ChangelogException
{
DraftCNDBCursor cursor = null;
try
{
cursor = db.openReadCursor(changeNumber);
- return cursor.currentValue();
+ return cursor.currentData();
}
finally
{
@@ -530,35 +574,4 @@
}
}
- /** {@inheritDoc} */
- @Override
- public CSN getCSN(long changeNumber) throws ChangelogException
- {
- DraftCNDBCursor cursor = null;
- try
- {
- cursor = db.openReadCursor(changeNumber);
- return cursor.currentCSN();
- }
- finally
- {
- close(cursor);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String getBaseDN(long changeNumber) throws ChangelogException
- {
- DraftCNDBCursor cursor = null;
- try
- {
- cursor = db.openReadCursor(changeNumber);
- return cursor.currentBaseDN();
- }
- finally
- {
- close(cursor);
- }
- }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
index 18ecd5d..762d027 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
@@ -29,7 +29,6 @@
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.DebugLogLevel;
@@ -68,13 +67,13 @@
/** {@inheritDoc} */
@Override
- public String getBaseDN()
+ public CNIndexData getCNIndexData()
{
try
{
- return this.draftCNDbCursor.currentBaseDN();
+ return this.draftCNDbCursor.currentData();
}
- catch(Exception e)
+ catch (Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
return null;
@@ -83,28 +82,6 @@
/** {@inheritDoc} */
@Override
- public CSN getCSN()
- {
- try
- {
- return this.draftCNDbCursor.currentCSN();
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public long getChangeNumber()
- {
- return ((ReplicationDraftCNKey) draftCNDbCursor.getKey()).getChangeNumber();
- }
-
- /** {@inheritDoc} */
- @Override
public boolean next() throws ChangelogException
{
if (draftCNDbCursor != null)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java
index e01c4a4..00229e6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java
@@ -31,6 +31,8 @@
import com.sleepycat.je.DatabaseEntry;
+import static org.opends.server.util.StaticUtils.*;
+
/**
* Useful to create ReplicationServer keys from sequence numbers.
*/
@@ -39,8 +41,18 @@
private static final long serialVersionUID = 1L;
/**
+ * Creates a ReplicationDraftCNKey that can start anywhere in the DB.
+ */
+ public ReplicationDraftCNKey()
+ {
+ super();
+ }
+
+ /**
* Creates a new ReplicationKey from the given change number.
- * @param changeNumber The change number to use.
+ *
+ * @param changeNumber
+ * The change number to use.
*/
public ReplicationDraftCNKey(long changeNumber)
{
@@ -63,6 +75,6 @@
*/
public long getChangeNumber()
{
- return Long.valueOf(new String(getData()));
+ return Long.valueOf(decodeUTF8(getData()));
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
index e577e28..7671929 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -36,6 +36,8 @@
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor;
@@ -45,21 +47,17 @@
import static org.testng.Assert.*;
/**
- * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db :
- * - periodic trim
- * - call to clear method()
+ * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : -
+ * periodic trim - call to clear method()
*/
@SuppressWarnings("javadoc")
public class DraftCNDbHandlerTest extends ReplicationTestCase
{
/**
- * This test makes basic operations of a DraftCNDb :
- * - create the db
- * - add records
- * - read them with a cursor
- * - set a very short trim period
- * - wait for the db to be trimmed / here since the changes are not stored in
- * the replication changelog, the draftCNDb will be cleared.
+ * This test makes basic operations of a DraftCNDb : - create the db - add
+ * records - read them with a cursor - set a very short trim period - wait for
+ * the db to be trimmed / here since the changes are not stored in the
+ * replication changelog, the draftCNDb will be cleared.
*/
@Test()
void testDraftCNDbHandlerTrim() throws Exception
@@ -99,40 +97,32 @@
String baseDN2 = "baseDN2";
String baseDN3 = "baseDN3";
- CSNGenerator gen = new CSNGenerator( 1, 0);
+ CSNGenerator gen = new CSNGenerator(1, 0);
CSN csn1 = gen.newCSN();
CSN csn2 = gen.newCSN();
CSN csn3 = gen.newCSN();
// Add records
- handler.add(cn1, value1, baseDN1, csn1);
- handler.add(cn2, value2, baseDN2, csn2);
- handler.add(cn3, value3, baseDN3, csn3);
+ handler.add(new CNIndexData(cn1, value1, baseDN1, csn1));
+ handler.add(new CNIndexData(cn2, value2, baseDN2, csn2));
+ handler.add(new CNIndexData(cn3, value3, baseDN3, csn3));
// The ChangeNumber should not get purged
- final long firstChangeNumber = handler.getFirstChangeNumber();
+ final long firstChangeNumber = getFirstChangeNumber(handler);
assertEquals(firstChangeNumber, cn1);
- assertEquals(handler.getLastChangeNumber(), cn3);
+ assertEquals(getLastChangeNumber(handler), cn3);
DraftCNDBCursor dbc = handler.getReadCursor(firstChangeNumber);
try
{
- assertEquals(dbc.currentCSN(), csn1);
- assertEquals(dbc.currentBaseDN(), baseDN1);
- assertEquals(dbc.currentValue(), value1);
+ assertEqualTo(dbc.currentData(), csn1, baseDN1, value1);
assertTrue(dbc.toString().length() != 0);
assertTrue(dbc.next());
-
- assertEquals(dbc.currentCSN(), csn2);
- assertEquals(dbc.currentBaseDN(), baseDN2);
- assertEquals(dbc.currentValue(), value2);
+ assertEqualTo(dbc.currentData(), csn2, baseDN2, value2);
assertTrue(dbc.next());
-
- assertEquals(dbc.currentCSN(), csn3);
- assertEquals(dbc.currentBaseDN(), baseDN3);
- assertEquals(dbc.currentValue(), value3);
+ assertEqualTo(dbc.currentData(), csn3, baseDN3, value3);
assertFalse(dbc.next());
}
@@ -144,12 +134,12 @@
handler.setPurgeDelay(100);
// Check the db is cleared.
- while(handler.count()!=0)
+ while (handler.count() != 0)
{
Thread.sleep(200);
}
- assertEquals(handler.getFirstChangeNumber(), 0);
- assertEquals(handler.getLastChangeNumber(), 0);
+ assertEquals(getFirstChangeNumber(handler), 0);
+ assertEquals(getLastChangeNumber(handler), 0);
}
finally
{
@@ -163,6 +153,14 @@
}
}
+ private void assertEqualTo(CNIndexData data, CSN csn, String baseDN,
+ String cookie)
+ {
+ assertEquals(data.getCSN(), csn);
+ assertEquals(data.getBaseDN(), baseDN);
+ assertEquals(data.getPreviousCookie(), cookie);
+ }
+
private File createCleanDir() throws IOException
{
String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
@@ -225,26 +223,26 @@
String baseDN2 = "baseDN2";
String baseDN3 = "baseDN3";
- CSNGenerator gen = new CSNGenerator( 1, 0);
+ CSNGenerator gen = new CSNGenerator(1, 0);
CSN csn1 = gen.newCSN();
CSN csn2 = gen.newCSN();
CSN csn3 = gen.newCSN();
// Add records
- handler.add(cn1, value1, baseDN1, csn1);
- handler.add(cn2, value2, baseDN2, csn2);
- handler.add(cn3, value3, baseDN3, csn3);
+ handler.add(new CNIndexData(cn1, value1, baseDN1, csn1));
+ handler.add(new CNIndexData(cn2, value2, baseDN2, csn2));
+ handler.add(new CNIndexData(cn3, value3, baseDN3, csn3));
Thread.sleep(500);
// Checks
- assertEquals(handler.getFirstChangeNumber(), cn1);
- assertEquals(handler.getLastChangeNumber(), cn3);
+ assertEquals(getFirstChangeNumber(handler), cn1);
+ assertEquals(getLastChangeNumber(handler), cn3);
assertEquals(handler.count(), 3, "Db count");
- assertEquals(handler.getPreviousCookie(cn1), value1);
- assertEquals(handler.getPreviousCookie(cn2), value2);
- assertEquals(handler.getPreviousCookie(cn3), value3);
+ assertEquals(getPreviousCookie(handler, cn1), value1);
+ assertEquals(getPreviousCookie(handler, cn2), value2);
+ assertEquals(getPreviousCookie(handler, cn3), value3);
ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(cn1);
assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -258,8 +256,8 @@
handler.clear();
// Check the db is cleared.
- assertEquals(handler.getFirstChangeNumber(), 0);
- assertEquals(handler.getLastChangeNumber(), 0);
+ assertEquals(getFirstChangeNumber(handler), 0);
+ assertEquals(getLastChangeNumber(handler), 0);
assertEquals(handler.count(), 0);
}
finally
@@ -274,6 +272,29 @@
}
}
+ private long getFirstChangeNumber(ChangeNumberIndexDB handler) throws Exception
+ {
+ return handler.getFirstCNIndexData().getChangeNumber();
+ }
+
+ private long getLastChangeNumber(ChangeNumberIndexDB handler) throws Exception
+ {
+ return handler.getLastCNIndexData().getChangeNumber();
+ }
+
+ private String getPreviousCookie(DraftCNDbHandler handler, long changeNumber) throws Exception
+ {
+ ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(changeNumber);
+ try
+ {
+ return cursor.getCNIndexData().getPreviousCookie();
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor,
int... sns) throws ChangelogException
{
@@ -281,7 +302,7 @@
{
for (int i = 0; i < sns.length; i++)
{
- assertEquals(cursor.getChangeNumber(), sns[i]);
+ assertEquals(cursor.getCNIndexData().getChangeNumber(), sns[i]);
final boolean isNotLast = i + 1 < sns.length;
assertEquals(cursor.next(), isNotLast);
}
--
Gitblit v1.10.0