From 47ad5445b84bfedcf71353d2aebe46c65bd294a7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 09 Oct 2013 11:12:19 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 73 +++------
opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java | 34 ++--
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 116 ++++++++--------
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java | 24 +--
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java | 24 +--
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 62 ++++----
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java | 21 +-
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 45 +++---
opends/src/messages/messages/replication.properties | 6
9 files changed, 190 insertions(+), 215 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 2eef8cd..854cd46 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -492,7 +492,7 @@
SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \
failed to parse change record with changenumber %s from the database. Error: %s
SEVERE_ERR_SESSION_STARTUP_INTERRUPTED_216=%s was interrupted in the startup phase
-MILD_ERR_READING_FIRST_THEN_LAST_IN_CHANGENUMBER_DATABASE_217=An error occurred \
- when accessing the change number database: impossible to read the last record \
- after having successfully read the first. Database might have been cleaned or \
+MILD_ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE_217=An error occurred \
+ when accessing the change number database: impossible to read the newest record \
+ after having successfully read the oldest. Database might have been cleaned or \
closed between successive reads
diff --git a/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java b/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
index 52b77a2..7a985d7 100644
--- a/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -94,7 +94,7 @@
@Override()
public void finalizeVirtualAttributeProvider()
{
- //
+ // nothing to finalize
}
@@ -126,7 +126,7 @@
@Override()
public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule)
{
- String first = "0";
+ String value = "0";
try
{
ECLWorkflowElement eclwe = (ECLWorkflowElement)
@@ -138,26 +138,22 @@
MultimasterReplication.getECLDisabledDomains();
excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
- ReplicationServer rs = eclwe.getReplicationServer();
- long[] limits = rs.getECLChangeNumberLimits(
+ final ReplicationServer rs = eclwe.getReplicationServer();
+ final long[] limits = rs.getECLChangeNumberLimits(
rs.getEligibleCSN(excludedDomains), excludedDomains);
-
- first = String.valueOf(limits[0]);
+ value = String.valueOf(limits[0]);
}
}
catch(Exception e)
{
- // We got an error computing the first change number.
+ // We got an error computing this change number.
// Rather than returning 0 which is no change, return -1 to
// indicate the error.
- first = "-1";
+ value = "-1";
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- AttributeValue value =
- AttributeValues.create(
- ByteString.valueOf(first),
- ByteString.valueOf(first));
- return Collections.singleton(value);
+ ByteString valueBS = ByteString.valueOf(value);
+ return Collections.singleton(AttributeValues.create(valueBS, valueBS));
}
@@ -170,7 +166,7 @@
SearchOperation searchOperation,
boolean isPreIndexed)
{
- // We do not allow search for the firstChangeNumber. It's a read-only
+ // We do not allow search for this change number. It's a read-only
// attribute of the RootDSE.
return false;
}
diff --git a/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java b/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
index 5ab3f53..626fea5 100644
--- a/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -94,7 +94,7 @@
@Override()
public void finalizeVirtualAttributeProvider()
{
- //
+ // nothing to finalize
}
@@ -126,7 +126,7 @@
@Override()
public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule)
{
- String last = "0";
+ String value = "0";
try
{
ECLWorkflowElement eclwe = (ECLWorkflowElement)
@@ -138,26 +138,22 @@
MultimasterReplication.getECLDisabledDomains();
excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
- ReplicationServer rs = eclwe.getReplicationServer();
- long[] limits = rs.getECLChangeNumberLimits(
+ final ReplicationServer rs = eclwe.getReplicationServer();
+ final long[] limits = rs.getECLChangeNumberLimits(
rs.getEligibleCSN(excludedDomains), excludedDomains);
-
- last = String.valueOf(limits[1]);
+ value = String.valueOf(limits[1]);
}
}
catch(Exception e)
{
- // We got an error computing the first change number.
+ // We got an error computing this change number.
// Rather than returning 0 which is no change, return -1 to
// indicate the error.
- last = "-1";
+ value = "-1";
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- AttributeValue value =
- AttributeValues.create(
- ByteString.valueOf(last),
- ByteString.valueOf(last));
- return Collections.singleton(value);
+ ByteString valueBS = ByteString.valueOf(value);
+ return Collections.singleton(AttributeValues.create(valueBS, valueBS));
}
@@ -170,7 +166,7 @@
SearchOperation searchOperation,
boolean isPreIndexed)
{
- // We do not allow search for the lastChangeNumber. It's a read-only
+ // We do not allow search for this change number. It's a read-only
// attribute of the RootDSE.
return false;
}
diff --git a/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java b/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
index bc5abe1..ce91b48 100644
--- a/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
@@ -27,10 +27,7 @@
*/
package org.opends.server.replication.common;
-import static org.opends.messages.ExtensionMessages.*;
-
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -41,12 +38,16 @@
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
+import static org.opends.messages.ExtensionMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
/**
* This class implements a virtual attribute provider in the root-dse entry
* that contains the last (newest) cookie (cross domain state)
@@ -58,6 +59,11 @@
{
/**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
* Creates a new instance of this member virtual attribute provider.
*/
public LastCookieVirtualProvider()
@@ -89,7 +95,7 @@
@Override()
public void finalizeVirtualAttributeProvider()
{
- //
+ // nothing to finalize
}
/**
@@ -120,7 +126,6 @@
@Override()
public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule)
{
- Set<AttributeValue> values = new HashSet<AttributeValue>();
try
{
ECLWorkflowElement eclwe = (ECLWorkflowElement)
@@ -132,22 +137,17 @@
MultimasterReplication.getECLDisabledDomains();
excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
- ReplicationServer rs = eclwe.getReplicationServer();
- MultiDomainServerState lastCookie =
- rs.getLastECLCookie(excludedDomains);
-
- AttributeValue value =
- AttributeValues.create(
- ByteString.valueOf(lastCookie.toString()),
- ByteString.valueOf(lastCookie.toString()));
- values=Collections.singleton(value);
+ final ReplicationServer rs = eclwe.getReplicationServer();
+ String newestCookie = rs.getNewestECLCookie(excludedDomains).toString();
+ final ByteString cookie = ByteString.valueOf(newestCookie);
+ return Collections.singleton(AttributeValues.create(cookie, cookie));
}
- return values;
}
- catch(Exception e)
+ catch (Exception e)
{
- return values;
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
+ return Collections.emptySet();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index aa79495..e72dabe 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -277,10 +277,10 @@
}
else if (newMsg.getCSN().getTime() >= domainLatestTrimDate)
{
- // when the replication changelog is trimmed, the last (latest) chg
- // is left in the db (whatever its age), and we don't want this chg
+ // when the replication changelog is trimmed, the newest change
+ // is left in the DB (whatever its age), and we don't want this change
// to be returned in the external changelog.
- // So let's check if the chg time is older than the trim date
+ // So let's check if the change time is older than the trim date
return newMsg;
}
}
@@ -528,10 +528,10 @@
}
/**
- * Initialize the handler from a provided first change number.
+ * Initialize the handler from a provided start change number.
*
* @param startChangeNumber
- * The provided first change number.
+ * The provided start change number.
* @throws DirectoryException
* When an error is raised.
*/
@@ -582,29 +582,28 @@
if (startChangeNumber <= 1)
{
- // Request filter DOES NOT contain any first change number
- // So we'll generate from the first change number in the CNIndexDB
- final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord();
- if (firstCNRecord == null)
+ // Request filter DOES NOT contain any start change number
+ // So we'll generate from the oldest change number in the CNIndexDB
+ final CNIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
+ if (oldestRecord == null)
{ // DB is empty or closed
isEndOfCNIndexDBReached = true;
return null;
}
- final long firstChangeNumber = firstCNRecord.getChangeNumber();
- final String crossDomainStartState = firstCNRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
+ final String crossDomainStartState = oldestRecord.getPreviousCookie();
+ cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestRecord.getChangeNumber());
return crossDomainStartState;
}
// Request filter DOES contain a startChangeNumber
// Read the CNIndexDB to see whether it contains startChangeNumber
- CNIndexRecord startCNRecord = cnIndexDB.getRecord(startChangeNumber);
- if (startCNRecord != null)
+ CNIndexRecord startRecord = cnIndexDB.getRecord(startChangeNumber);
+ if (startRecord != null)
{
// found the provided startChangeNumber, let's return it
- final String crossDomainStartState = startCNRecord.getPreviousCookie();
+ final String crossDomainStartState = startRecord.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
return crossDomainStartState;
}
@@ -613,50 +612,49 @@
/*
* Get the changeNumberLimits (from the eligibleCSN obtained at the start of
- * this method) in order to have the first and last change numbers.
+ * this method) in order to have the oldest and newest change numbers.
*/
final long[] limits = replicationServer.getECLChangeNumberLimits(
eligibleCSN, excludedBaseDNs);
- final long firstChangeNumber = limits[0];
- final long lastChangeNumber = limits[1];
+ final long oldestChangeNumber = limits[0];
+ final long newestChangeNumber = limits[1];
- // If the startChangeNumber provided is lower than the firstChangeNumber in
+ // If the startChangeNumber provided is lower than the oldestChangeNumber in
// the DB, let's use the lower limit.
- if (startChangeNumber < firstChangeNumber)
+ if (startChangeNumber < oldestChangeNumber)
{
- CNIndexRecord firstCNRecord = cnIndexDB.getRecord(firstChangeNumber);
- if (firstCNRecord == null)
+ CNIndexRecord oldestRecord = cnIndexDB.getRecord(oldestChangeNumber);
+ if (oldestRecord == null)
{
// This should not happen
isEndOfCNIndexDBReached = true;
return null;
}
- final String crossDomainStartState = firstCNRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
+ final String crossDomainStartState = oldestRecord.getPreviousCookie();
+ cnIndexDBCursor = cnIndexDB.getCursorFrom(oldestChangeNumber);
return crossDomainStartState;
}
- else if (startChangeNumber <= lastChangeNumber)
+ else if (startChangeNumber <= newestChangeNumber)
{
- // startChangeNumber is between first and potential last and has never
+ // startChangeNumber is between oldest and potential newest and has never
// been returned yet
- final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
- if (lastCNRecord == null)
+ final CNIndexRecord newestRecord = cnIndexDB.getNewestRecord();
+ if (newestRecord == null)
{
isEndOfCNIndexDBReached = true;
return null;
}
- final long lastKey = lastCNRecord.getChangeNumber();
- final String crossDomainStartState = lastCNRecord.getPreviousCookie();
- cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
+ final String crossDomainStartState = newestRecord.getPreviousCookie();
+ cnIndexDBCursor = cnIndexDB.getCursorFrom(newestRecord.getChangeNumber());
return crossDomainStartState;
// TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ...
// this may be very long. Work on perf improvement here.
}
- // startChangeNumber is greater than the potential lastChangeNumber
+ // startChangeNumber is greater than the potential newest change number
throw new DirectoryException(ResultCode.SUCCESS, Message.raw(""));
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 7716313..c0fdf78 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1363,18 +1363,18 @@
}
/**
- * Get first and last change number.
+ * Get oldest and newest change numbers.
*
- * @param crossDomainEligibleCSN
+ * @param maxOldestChangeNumber
* The provided crossDomainEligibleCSN used as the upper limit for
- * the last change number
+ * the oldest change number
* @param excludedBaseDNs
* The baseDNs that are excluded from the ECL.
- * @return The first and last change numbers.
+ * @return The oldest and newest change numbers.
* @throws DirectoryException
* When it happens.
*/
- public long[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN,
+ public long[] getECLChangeNumberLimits(CSN maxOldestChangeNumber,
Set<String> excludedBaseDNs) throws DirectoryException
{
/* The content of the CNIndexDB depends on the SEARCH operations done before
@@ -1382,103 +1382,105 @@
* The limits we want to get are the "potential" limits if a request was
* done, the CNIndexDB is probably not complete to do that.
*
- * The first change number is :
- * - the first record from the CNIndexDB
+ * The oldest change number is :
+ * - the oldest record from the CNIndexDB
* - if none because CNIndexDB empty,
* then
* if no change in replchangelog then return 0
* else return 1 (change number that WILL be returned to next search)
*
- * The last change number is :
- * - initialized with the last record from the CNIndexDB (0 if none)
+ * The newest change number is :
+ * - initialized with the newest record from the CNIndexDB (0 if none)
* and consider the genState associated
- * - to the last change number, we add the count of updates in the
+ * - to the newest change number, we add the count of updates in the
* replchangelog FROM that genState TO the crossDomainEligibleCSN
* (this diff is done domain by domain)
*/
try
{
boolean dbEmpty = true;
- long firstChangeNumber = 0;
- long lastChangeNumber = 0;
+ long oldestChangeNumber = 0;
+ long newestChangeNumber = 0;
final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
- final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord();
- final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
+ final CNIndexRecord oldestCNRecord = cnIndexDB.getOldestRecord();
+ final CNIndexRecord newestCNRecord = cnIndexDB.getNewestRecord();
- boolean noCookieForLastCN = true;
- CSN csnForLastCN = null;
- DN domainForLastCN = null;
- if (firstCNRecord != null)
+ boolean noCookieForNewestCN = true;
+ CSN csnForNewestCN = null;
+ DN baseDNForNewestCN = null;
+ if (oldestCNRecord != null)
{
- if (lastCNRecord == null)
+ if (newestCNRecord == null)
{
- // Edge case: DB was cleaned or closed in between call to getFirst*()
- // and getLast*(). The only remaining solution is to fail fast.
+ // Edge case: DB was cleaned or closed in between calls to
+ // getOldest*() and getNewest*().
+ // The only remaining solution is to fail fast.
throw new ChangelogException(
- ERR_READING_FIRST_THEN_LAST_IN_CHANGENUMBER_DATABASE.get());
+ ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
}
dbEmpty = false;
- firstChangeNumber = firstCNRecord.getChangeNumber();
- lastChangeNumber = lastCNRecord.getChangeNumber();
+ oldestChangeNumber = oldestCNRecord.getChangeNumber();
+ newestChangeNumber = newestCNRecord.getChangeNumber();
- // Get the generalized state associated with the current last change
+ // Get the generalized state associated with the current newest change
// number and initializes from it the startStates table
- String lastCNGenState = lastCNRecord.getPreviousCookie();
- noCookieForLastCN = lastCNGenState == null
- || lastCNGenState.length() == 0;
+ String newestCNGenState = newestCNRecord.getPreviousCookie();
+ noCookieForNewestCN =
+ newestCNGenState == null || newestCNGenState.length() == 0;
- csnForLastCN = lastCNRecord.getCSN();
- domainForLastCN = lastCNRecord.getBaseDN();
+ csnForNewestCN = newestCNRecord.getCSN();
+ baseDNForNewestCN = newestCNRecord.getBaseDN();
}
long newestDate = 0;
- for (ReplicationServerDomain rsd : getReplicationServerDomains())
+ for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
{
- if (contains(excludedBaseDNs, rsd.getBaseDN().toNormalizedString()))
+ if (contains(
+ excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString()))
continue;
// for this domain, have the state in the replchangelog
- // where the last change number update is
+ // where the newest change number update is
long ec;
- if (noCookieForLastCN)
+ if (noCookieForNewestCN)
{
// Count changes of this domain from the beginning of the changelog
- CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
- ec = rsd.getEligibleCount(
- rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
- crossDomainEligibleCSN);
+ CSN trimCSN = new CSN(rsDomain.getLatestDomainTrimDate(), 0, 0);
+ ec = rsDomain.getEligibleCount(
+ rsDomain.getStartState().duplicateOnlyOlderThan(trimCSN),
+ maxOldestChangeNumber);
}
else
{
// There are records in the CNIndexDB (so already returned to clients)
// BUT
- // There is nothing related to this domain in the last CNIndexRecord
+ // There is nothing related to this domain in the newest CNIndexRecord
// (may be this domain was disabled when this record was returned).
// In that case, are counted the changes from
- // the date of the most recent change from this last CNIndexRecord
+ // the date of the most recent change from this newest CNIndexRecord
if (newestDate == 0)
{
- newestDate = csnForLastCN.getTime();
+ newestDate = csnForNewestCN.getTime();
}
// And count changes of this domain from the date of the
- // lastseqnum record (that does not refer to this domain)
- CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
- ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
+ // newest seqnum record (that does not refer to this domain)
+ CSN csnx = new CSN(newestDate, csnForNewestCN.getSeqnum(), 0);
+ ec = rsDomain.getEligibleCount(csnx, maxOldestChangeNumber);
- if (domainForLastCN.equals(rsd.getBaseDN()))
+ if (baseDNForNewestCN.equals(rsDomain.getBaseDN()))
ec--;
}
// cumulates on domains
- lastChangeNumber += ec;
+ newestChangeNumber += ec;
// CNIndexDB is empty and there are eligible updates in the replication
- // changelog then init first change number
- if (ec > 0 && firstChangeNumber == 0)
- firstChangeNumber = 1;
+ // changelog then init oldest change number
+ if (ec > 0 && oldestChangeNumber == 0)
+ oldestChangeNumber = 1;
}
if (dbEmpty)
@@ -1486,10 +1488,10 @@
// The database was empty, just keep increasing numbers since last time
// we generated one change number.
long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
- firstChangeNumber += lastGeneratedCN;
- lastChangeNumber += lastGeneratedCN;
+ oldestChangeNumber += lastGeneratedCN;
+ newestChangeNumber += lastGeneratedCN;
}
- return new long[] { firstChangeNumber, lastChangeNumber };
+ return new long[] { oldestChangeNumber, newestChangeNumber };
}
catch (ChangelogException e)
{
@@ -1498,11 +1500,13 @@
}
/**
- * Returns the last (newest) cookie value.
- * @param excludedBaseDNs The list of baseDNs excluded from ECL.
- * @return the last cookie value.
+ * Returns the newest cookie value.
+ *
+ * @param excludedBaseDNs
+ * The list of baseDNs excluded from ECL.
+ * @return the newest cookie value.
*/
- public MultiDomainServerState getLastECLCookie(Set<String> excludedBaseDNs)
+ public MultiDomainServerState getNewestECLCookie(Set<String> excludedBaseDNs)
{
// Initialize start state for all running domains with empty state
MultiDomainServerState result = new MultiDomainServerState();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index bda1390..d972f66 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -46,7 +46,7 @@
/**
* Returns the last generated change number.
*
- * @return the lastGeneratedChangeNumber
+ * @return the last generated change number
*/
long getLastGeneratedChangeNumber();
@@ -62,31 +62,30 @@
CNIndexRecord getRecord(long changeNumber) throws ChangelogException;
/**
- * Get the first record stored in this DB.
+ * Get the oldest record stored in this DB.
*
- * @return Returns the first {@link CNIndexRecord} in this DB, null when the
+ * @return Returns the oldest {@link CNIndexRecord} in this DB, null when the
* DB is empty or closed
* @throws ChangelogException
* if a database problem occurs.
*/
- CNIndexRecord getFirstRecord() throws ChangelogException;
+ CNIndexRecord getOldestRecord() throws ChangelogException;
/**
- * Get the last record stored in this DB.
+ * Get the newest record stored in this DB.
*
- * @return Returns the last {@link CNIndexRecord} in this DB, null when the DB
- * is empty or closed
+ * @return Returns the newest {@link CNIndexRecord} in this DB, null when the
+ * DB is empty or closed
* @throws ChangelogException
* if a database problem occurs.
*/
- CNIndexRecord getLastRecord() throws ChangelogException;
+ CNIndexRecord getNewestRecord() throws ChangelogException;
/**
* Add an update to the list of messages that must be saved to this DB managed
- * by this DB.
+ * by this DB and return the changeNumber associated to this record.
* <p>
- * This method is blocking if the size of the list of message is larger than
- * its maximum.
+ * Note: this method disregards the changeNumber in the provided record.
* <p>
* FIXME will be removed when ECLServerHandler will not be responsible anymore
* for lazily building the ChangeNumberIndexDB.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index a6e9d17..d2d7795 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -74,15 +74,15 @@
private DraftCNDB db;
/**
- * FIXME Is this field that useful? {@link #getFirstChangeNumber()} does not
+ * FIXME Is this field that useful? {@link #getOldestChangeNumber()} does not
* even use it!
*/
- private long firstChangeNumber = NO_KEY;
+ private volatile long oldestChangeNumber = NO_KEY;
/**
- * FIXME Is this field that useful? {@link #getLastChangeNumber()} does not
+ * FIXME Is this field that useful? {@link #getNewestChangeNumber()} does not
* even use it!
*/
- private long lastChangeNumber = NO_KEY;
+ private volatile long newestChangeNumber = NO_KEY;
/** The last generated value for the change number. */
private final AtomicLong lastGeneratedChangeNumber;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
@@ -102,7 +102,7 @@
* FIXME it never gets updated even when the replication server purge delay is
* updated
*/
- private long trimAge;
+ private volatile long trimAge;
private ReplicationServer replicationServer;
@@ -123,14 +123,14 @@
// DB initialization
db = new DraftCNDB(dbenv);
- final CNIndexRecord firstRecord = db.readFirstRecord();
- final CNIndexRecord lastRecord = db.readLastRecord();
- firstChangeNumber = getChangeNumber(firstRecord);
- lastChangeNumber = getChangeNumber(lastRecord);
+ final CNIndexRecord oldestRecord = db.readFirstRecord();
+ final CNIndexRecord newestRecord = db.readLastRecord();
+ oldestChangeNumber = getChangeNumber(oldestRecord);
+ newestChangeNumber = getChangeNumber(newestRecord);
// initialization of the lastGeneratedChangeNumber from the DB content
// if DB is empty => last record does not exist => default to 0
- lastGeneratedChangeNumber =
- new AtomicLong((lastRecord != null) ? lastRecord.getChangeNumber() : 0);
+ long newestCN = (newestRecord != null) ? newestRecord.getChangeNumber() : 0;
+ lastGeneratedChangeNumber = new AtomicLong(newestCN);
// Trimming thread
thread =
@@ -168,14 +168,14 @@
/** {@inheritDoc} */
@Override
- public CNIndexRecord getFirstRecord() throws ChangelogException
+ public CNIndexRecord getOldestRecord() throws ChangelogException
{
return db.readFirstRecord();
}
/** {@inheritDoc} */
@Override
- public CNIndexRecord getLastRecord() throws ChangelogException
+ public CNIndexRecord getNewestRecord() throws ChangelogException
{
return db.readLastRecord();
}
@@ -211,7 +211,7 @@
*/
public boolean isEmpty() throws ChangelogException
{
- return getLastRecord() == null;
+ return getNewestRecord() == null;
}
/**
@@ -375,12 +375,12 @@
continue;
}
+ // Purge up to wherever the other DBs have been purged to.
+ // FIXME there is an opportunity for a phantom record in the current
+ // DB if the replicaDB gets purged after the next if statement.
final CSN csn = record.getCSN();
final ServerState startState = domain.getStartState();
final CSN fcsn = startState.getCSN(csn.getServerId());
-
- final long currentChangeNumber = record.getChangeNumber();
-
if (csn.isOlderThan(fcsn))
{
cursor.delete();
@@ -402,6 +402,7 @@
catch(Exception e)
{
// We could not parse the MultiDomainServerState from the record
+ // FIXME this is quite an aggressive delete()
cursor.delete();
continue;
}
@@ -417,7 +418,7 @@
continue;
}
- firstChangeNumber = currentChangeNumber;
+ oldestChangeNumber = record.getChangeNumber();
cursor.close();
return;
}
@@ -515,8 +516,8 @@
@Override
public String toString()
{
- return "JEChangeNumberIndexDB: " + firstChangeNumber + " "
- + lastChangeNumber;
+ return getClass().getSimpleName() + ": " + oldestChangeNumber + " "
+ + newestChangeNumber;
}
/**
@@ -537,8 +538,8 @@
public void clear() throws ChangelogException
{
db.clear();
- firstChangeNumber = getChangeNumber(db.readFirstRecord());
- lastChangeNumber = getChangeNumber(db.readLastRecord());
+ oldestChangeNumber = getChangeNumber(db.readFirstRecord());
+ newestChangeNumber = getChangeNumber(db.readLastRecord());
}
private ReentrantLock lock = new ReentrantLock();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index d176557..4b92f77 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -71,24 +71,11 @@
JEChangeNumberIndexDB cnIndexDB = null;
try
{
- TestCaseUtils.startServer();
-
- int changelogPort = TestCaseUtils.findFreePort();
-
- // configure a ReplicationServer.
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0,
- 2, 0, 100, null);
- replicationServer = new ReplicationServer(conf);
-
+ replicationServer = newReplicationServer();
cnIndexDB = newCNIndexDB(replicationServer);
cnIndexDB.setPurgeDelay(0);
// Prepare data to be stored in the db
- int cn1 = 3;
- int cn2 = 4;
- int cn3 = 5;
-
String value1 = "value1";
String value2 = "value2";
String value3 = "value3";
@@ -100,16 +87,16 @@
CSN[] csns = newCSNs(1, 0, 3);
// Add records
- cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0]));
- cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1]));
- cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2]));
+ long cn1 = cnIndexDB.addRecord(new CNIndexRecord(value1, baseDN1, csns[0]));
+ cnIndexDB.addRecord(new CNIndexRecord(value2, baseDN2, csns[1]));
+ long cn3 = cnIndexDB.addRecord(new CNIndexRecord(value3, baseDN3, csns[2]));
// The ChangeNumber should not get purged
- final long firstChangeNumber = cnIndexDB.getFirstRecord().getChangeNumber();
- assertEquals(firstChangeNumber, cn1);
- assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3);
+ final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
+ assertEquals(oldestCN, cn1);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
- DraftCNDBCursor dbc = cnIndexDB.getReadCursor(firstChangeNumber);
+ DraftCNDBCursor dbc = cnIndexDB.getReadCursor(oldestCN);
try
{
assertEqualTo(dbc.currentRecord(), csns[0], baseDN1, value1);
@@ -135,8 +122,8 @@
{
Thread.sleep(200);
}
- assertNull(cnIndexDB.getFirstRecord());
- assertNull(cnIndexDB.getLastRecord());
+ assertNull(cnIndexDB.getOldestRecord());
+ assertNull(cnIndexDB.getNewestRecord());
assertEquals(cnIndexDB.count(), 0);
}
finally
@@ -191,26 +178,13 @@
JEChangeNumberIndexDB cnIndexDB = null;
try
{
- TestCaseUtils.startServer();
-
- int changelogPort = TestCaseUtils.findFreePort();
-
- // configure a ReplicationServer.
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0,
- 2, 0, 100, null);
- replicationServer = new ReplicationServer(conf);
-
+ replicationServer = newReplicationServer();
cnIndexDB = newCNIndexDB(replicationServer);
cnIndexDB.setPurgeDelay(0);
assertTrue(cnIndexDB.isEmpty());
// Prepare data to be stored in the db
- int cn1 = 3;
- int cn2 = 4;
- int cn3 = 5;
-
String value1 = "value1";
String value2 = "value2";
String value3 = "value3";
@@ -222,14 +196,13 @@
CSN[] csns = newCSNs(1, 0, 3);
// Add records
- cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0]));
- cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1]));
- cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2]));
- Thread.sleep(500);
+ long cn1 = cnIndexDB.addRecord(new CNIndexRecord(value1, baseDN1, csns[0]));
+ long cn2 = cnIndexDB.addRecord(new CNIndexRecord(value2, baseDN2, csns[1]));
+ long cn3 = cnIndexDB.addRecord(new CNIndexRecord(value3, baseDN3, csns[2]));
// Checks
- assertEquals(cnIndexDB.getFirstRecord().getChangeNumber(), cn1);
- assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3);
+ assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
assertEquals(cnIndexDB.count(), 3, "Db count");
assertFalse(cnIndexDB.isEmpty());
@@ -250,8 +223,8 @@
cnIndexDB.clear();
// Check the db is cleared.
- assertNull(cnIndexDB.getFirstRecord());
- assertNull(cnIndexDB.getLastRecord());
+ assertNull(cnIndexDB.getOldestRecord());
+ assertNull(cnIndexDB.getNewestRecord());
assertEquals(cnIndexDB.count(), 0);
assertTrue(cnIndexDB.isEmpty());
}
@@ -263,6 +236,14 @@
}
}
+ private ReplicationServer newReplicationServer() throws Exception
+ {
+ TestCaseUtils.startServer();
+ final int port = TestCaseUtils.findFreePort();
+ return new ReplicationServer(
+ new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null)) ;
+ }
+
private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,
long changeNumber) throws Exception
{
@@ -278,7 +259,7 @@
}
private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor,
- int... sns) throws ChangelogException
+ long... sns) throws ChangelogException
{
try
{
--
Gitblit v1.10.0