From 84cf626ebcae1b535abe9efd3eed5cdf78bdd319 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Sep 2013 07:51:54 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 97 ++++++++++++++++++++++++------------------------
1 files changed, 49 insertions(+), 48 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index e0b585d..ee5f785 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,7 +41,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
@@ -71,7 +74,7 @@
/**
* Specifies the last changer number requested.
*/
- private int lastChangeNumber = 0;
+ private long lastChangeNumber = 0;
/**
* Specifies whether the change number db has been read until its end.
*/
@@ -522,7 +525,7 @@
* @throws DirectoryException
* When an error is raised.
*/
- private void initializeCLSearchFromChangeNumber(int startChangeNumber)
+ private void initializeCLSearchFromChangeNumber(long startChangeNumber)
throws DirectoryException
{
try
@@ -535,13 +538,13 @@
catch(DirectoryException de)
{
TRACER.debugCaught(DebugLogLevel.ERROR, de);
- releaseIterator();
+ releaseCursor();
throw de;
}
catch(Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- releaseIterator();
+ releaseCursor();
throw new DirectoryException(
ResultCode.OPERATIONS_ERROR,
Message.raw(Category.SYNC,
@@ -561,9 +564,8 @@
* @throws DirectoryException
* if a database problem occurred
*/
- private String findCookie(final int startChangeNumber)
- throws ChangelogException,
- DirectoryException
+ private String findCookie(final long startChangeNumber)
+ throws ChangelogException, DirectoryException
{
final ChangeNumberIndexDB cnIndexDB =
replicationServer.getChangeNumberIndexDB();
@@ -581,9 +583,9 @@
return null;
}
- final long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
- final String crossDomainStartState =
- cnIndexDB.getPreviousCookie(firstChangeNumber);
+ final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData();
+ final long firstChangeNumber = firstCNData.getChangeNumber();
+ final String crossDomainStartState = firstCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
return crossDomainStartState;
}
@@ -591,11 +593,11 @@
// Request filter DOES contain a startChangeNumber
// Read the draftCNDb to see whether it contains startChangeNumber
- String crossDomainStartState =
- cnIndexDB.getPreviousCookie(startChangeNumber);
- if (crossDomainStartState != null)
+ CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber);
+ if (startCNData != null)
{
// found the provided startChangeNumber, let's return it
+ final String crossDomainStartState = startCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
return crossDomainStartState;
}
@@ -615,9 +617,10 @@
// the DB, let's use the lower limit.
if (startChangeNumber < firstChangeNumber)
{
- crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber);
- if (crossDomainStartState != null)
+ CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber);
+ if (firstCNData != null)
{
+ final String crossDomainStartState = firstCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
return crossDomainStartState;
}
@@ -636,8 +639,9 @@
return null;
}
- final long lastKey = cnIndexDB.getLastChangeNumber();
- crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
+ final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData();
+ final long lastKey = lastCNData.getChangeNumber();
+ final String crossDomainStartState = lastCNData.getPreviousCookie();
cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
return crossDomainStartState;
@@ -897,7 +901,7 @@
{
if (debugEnabled())
TRACER.debugInfo(this + " shutdown()");
- releaseIterator();
+ releaseCursor();
for (DomainContext domainCtxt : domainCtxts) {
if (!domainCtxt.unRegisterHandler()) {
logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -910,7 +914,7 @@
domainCtxts = null;
}
- private void releaseIterator()
+ private void releaseCursor()
{
if (this.cnIndexDBCursor != null)
{
@@ -1256,13 +1260,10 @@
oldestContext.currentState.update(
change.getUpdateMsg().getCSN());
- if (oldestContext.currentState.cover(oldestContext.stopState))
- {
- oldestContext.active = false;
- }
- if (draftCompat
- && lastChangeNumber > 0
- && change.getChangeNumber() > lastChangeNumber)
+ if (oldestContext.currentState.cover(oldestContext.stopState)
+ || (draftCompat
+ && lastChangeNumber > 0
+ && change.getChangeNumber() > lastChangeNumber))
{
oldestContext.active = false;
}
@@ -1278,8 +1279,9 @@
if (searchPhase == PERSISTENT_PHASE)
{
if (debugEnabled())
- clDomCtxtsToString("In getNextECLUpdate (persistent): " +
- "looking for the generalized oldest change");
+ TRACER.debugInfo(clDomCtxtsToString(
+ "In getNextECLUpdate (persistent): "
+ + "looking for the generalized oldest change"));
for (DomainContext domainCtxt : domainCtxts) {
domainCtxt.getNextEligibleMessageForDomain(operationId);
@@ -1300,7 +1302,7 @@
if (draftCompat)
{
- assignNewDraftCNAndStore(change);
+ assignNewChangeNumberAndStore(change);
}
oldestChange = change;
}
@@ -1317,21 +1319,19 @@
if (oldestChange != null)
{
+ final CSN csn = oldestChange.getUpdateMsg().getCSN();
if (debugEnabled())
- TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
- + oldestChange.getUpdateMsg().getCSN());
+ TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
// Update the current state
- previousCookie.update(
- oldestChange.getBaseDN(),
- oldestChange.getUpdateMsg().getCSN());
+ previousCookie.update(oldestChange.getBaseDN(), csn);
// Set the current value of global state in the returned message
oldestChange.setCookie(previousCookie);
if (debugEnabled())
- TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
- oldestChange);
+ TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
+ + oldestChange);
}
return oldestChange;
@@ -1370,14 +1370,15 @@
if (isEndOfCNIndexDBReached)
{
// we are at the end of the DraftCNdb in the append mode
- assignNewDraftCNAndStore(oldestChange);
+ assignNewChangeNumberAndStore(oldestChange);
return true;
}
// the next change from the CNIndexDB
- CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN();
- String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN();
+ final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData();
+ final CSN csnFromDraftCNDb = cnIndexData.getCSN();
+ final String dnFromDraftCNDb = cnIndexData.getBaseDN();
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() generating change number "
@@ -1392,10 +1393,10 @@
{
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() generating change number "
- + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber()
+ + " assigning changeNumber=" + cnIndexData.getChangeNumber()
+ " to change=" + oldestChange);
- oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber());
+ oldestChange.setChangeNumber(cnIndexData.getChangeNumber());
return true;
}
@@ -1429,8 +1430,8 @@
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() generating change number has"
- + "skipped to changeNumber=" + cnIndexDBCursor.getChangeNumber()
- + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?"
+ + "skipped to changeNumber=" + cnIndexData.getChangeNumber()
+ + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?"
+ isEndOfCNIndexDBReached);
}
catch (ChangelogException e)
@@ -1452,7 +1453,7 @@
return sameDN && sameCSN;
}
- private void assignNewDraftCNAndStore(ECLUpdateMsg change)
+ private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
throws DirectoryException, ChangelogException
{
// generate a new change number and assign to this change
@@ -1460,11 +1461,11 @@
// store in CNIndexDB the pair
// (change number of the current change, state before this change)
- replicationServer.getChangeNumberIndexDB().add(
+ replicationServer.getChangeNumberIndexDB().add(new CNIndexData(
change.getChangeNumber(),
previousCookie.toString(),
change.getBaseDN(),
- change.getUpdateMsg().getCSN());
+ change.getUpdateMsg().getCSN()));
}
/**
@@ -1499,7 +1500,7 @@
}
// End of INIT_PHASE => always release the iterator
- releaseIterator();
+ releaseCursor();
}
/**
--
Gitblit v1.10.0