From 82f5228d84de25cd2ea7d99e9880a8c11971e743 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Oct 2013 14:40:12 +0000
Subject: [PATCH] Code cleanups.
---
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 323 +++++++++++++++++++++++------------------------------
1 files changed, 142 insertions(+), 181 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 1adfaaf..cd9d4a1 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -50,6 +50,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.*;
+import static org.opends.server.util.StaticUtils.*;
/**
* This class defines a server handler, which handles all interaction with a
@@ -58,6 +59,11 @@
public final class ECLServerHandler extends ServerHandler
{
+ private static int UNDEFINED_PHASE = 0;
+ /** TODO JNR. */
+ public static int INIT_PHASE = 1;
+ private static int PERSISTENT_PHASE = 2;
+
/**
* This is a string identifying the operation, provided by the client part of
* the ECL, used to help interpretation of messages logged.
@@ -108,6 +114,11 @@
private CSN eligibleCSN;
/**
+ * The global list of contexts by domain for the search currently processed.
+ */
+ private DomainContext[] domainCtxts = new DomainContext[0];
+
+ /**
* Provides a string representation of this object.
* @return the string representation.
*/
@@ -172,8 +183,7 @@
buffer.append("[ [active=").append(active)
.append("] [rsd=").append(rsd)
.append("] [nextMsg=").append(nextMsg).append("(")
- .append(nextMsg != null ?
- new Date(nextMsg.getCSN().getTime()).toString():"")
+ .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
.append(")")
.append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
.append("] [startState=").append(startState)
@@ -183,16 +193,15 @@
}
/**
- * Get the next message eligible regarding
- * the crossDomain eligible CSN. Put it in the context table.
- * @param opid The operation id.
+ * Computes the next message eligible regarding the crossDomain eligible
+ * CSN.
+ *
+ * @param opId The operation id.
*/
- private void getNextEligibleMessageForDomain(String opid)
+ private void computeNextEligibleMessageForDomain(String opId)
{
if (debugEnabled())
- TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDNString() +
- " getNextEligibleMessageForDomain(" + opid+ ") "
- + "ctxt=" + toString());
+ debugInfo(opId, "ctxt=" + this);
assert(nextMsg == null);
try
@@ -202,22 +211,15 @@
// not eligible
if (nextNonEligibleMsg != null)
{
- boolean hasBecomeEligible =
- (nextNonEligibleMsg.getCSN().getTime()
- <= eligibleCSN.getTime());
+ final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
if (debugEnabled())
- TRACER.debugInfo(" In ECLServerHandler, for "
- + mh.getBaseDNString()
- + " getNextEligibleMessageForDomain(" + opid + ") "
- + " stored nonEligibleMsg " + nextNonEligibleMsg
- + " has now become eligible regarding "
- + " the eligibleCSN ("+ eligibleCSN
- + " ):" + hasBecomeEligible);
+ debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
+ + " has now become eligible regarding the eligibleCSN ("
+ + eligibleCSN + " ): " + hasBecomeEligible);
if (hasBecomeEligible)
{
- // it is now eligible
nextMsg = nextNonEligibleMsg;
nextNonEligibleMsg = null;
}
@@ -226,50 +228,30 @@
else
{
// Here comes a new message !!!
- // non blocking
- UpdateMsg newMsg;
- do {
- newMsg = mh.getNextMessage(false);
- // when the replication changelog is trimmed, the last (latest) chg
- // is left in the db (whatever its age), and we don't want this chg
- // to be returned in the external changelog.
- // So let's check if the chg time is older than the trim date
- } while ((newMsg!=null) &&
- (newMsg.getCSN().getTime() < domainLatestTrimDate));
-
- if (debugEnabled())
- TRACER.debugInfo(" In ECLServerHandler, for "
- + mh.getBaseDNString()
- + " getNextEligibleMessageForDomain(" + opid + ") "
- + " got new message : "
- + " baseDN=[" + mh.getBaseDNString()
- + "] [newMsg=" + newMsg + "]" + dumpState());
-
- // in non blocking mode, return null when no more msg
- if (newMsg != null)
+ final UpdateMsg newMsg = getNextMessage();
+ if (newMsg == null)
{
- boolean isEligible = (newMsg.getCSN().getTime()
- <= eligibleCSN.getTime());
+ return;
+ }
if (debugEnabled())
- TRACER.debugInfo(" In ECLServerHandler, for "
- + mh.getBaseDNString()
- + " getNextEligibleMessageForDomain(" + opid+ ") "
- + "newMsg isEligible=" + isEligible + " since "
- + "newMsg=[" + newMsg.getCSN()
- + " " + new Date(newMsg.getCSN().getTime())
- + "] eligibleCSN=[" + eligibleCSN
- + " " + new Date(eligibleCSN.getTime())+"]"
+ debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
+ dumpState());
- if (isEligible)
- {
- nextMsg = newMsg;
- }
- else
- {
- nextNonEligibleMsg = newMsg;
- }
+ final boolean isEligible = isEligible(newMsg);
+
+ if (debugEnabled())
+ debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
+ + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
+ + toString(eligibleCSN) + "] " + dumpState());
+
+ if (isEligible)
+ {
+ nextMsg = newMsg;
+ }
+ else
+ {
+ nextNonEligibleMsg = newMsg;
}
}
}
@@ -279,6 +261,44 @@
}
}
+ private boolean isEligible(UpdateMsg msg)
+ {
+ return msg.getCSN().getTime() <= eligibleCSN.getTime();
+ }
+
+ private UpdateMsg getNextMessage()
+ {
+ while (true)
+ {
+ final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
+
+ if (newMsg == null)
+ { // in non blocking mode, null means no more messages
+ return null;
+ }
+ else if (newMsg.getCSN().getTime() < domainLatestTrimDate)
+ {
+ // when the replication changelog is trimmed, the last (latest) chg
+ // is left in the db (whatever its age), and we don't want this chg
+ // to be returned in the external changelog.
+ // So let's check if the chg time is older than the trim date
+ return newMsg;
+ }
+ }
+ }
+
+ private String toString(CSN csn)
+ {
+ return csn + " " + asDate(csn);
+ }
+
+ private void debugInfo(String opId, String message)
+ {
+ TRACER.debugInfo("In ECLServerHandler, for baseDN="
+ + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
+ + ") " + message);
+ }
+
/**
* Unregister the handler from the DomainContext ReplicationDomain.
* @return Whether the handler has been unregistered with success.
@@ -297,11 +317,6 @@
}
}
- /**
- * The global list of contexts by domain for the search currently processed.
- */
- private DomainContext[] domainCtxts = new DomainContext[0];
-
private String clDomCtxtsToString(String msg)
{
StringBuilder buffer = new StringBuilder();
@@ -313,10 +328,6 @@
return buffer.toString();
}
- private static int UNDEFINED_PHASE = 0;
- private static int INIT_PHASE = 1;
- private static int PERSISTENT_PHASE = 2;
-
/**
* Starts this handler based on a start message received from remote server.
* @param inECLStartMsg The start msg provided by the remote server.
@@ -509,13 +520,18 @@
/**
* Initialize the handler from a provided cookie value.
- * @param crossDomainStartState The provided cookie value.
- * @throws DirectoryException When an error is raised.
+ *
+ * @param providedCookie
+ * The provided cookie value.
+ * @throws DirectoryException
+ * When an error is raised.
*/
- private void initializeCLSearchFromGenState(String crossDomainStartState)
+ private void initializeCLSearchFromCookie(String providedCookie)
throws DirectoryException
{
- initializeChangelogDomainCtxts(crossDomainStartState, false);
+ this.draftCompat = false;
+
+ initializeChangelogDomainCtxts(providedCookie, false);
}
/**
@@ -684,12 +700,9 @@
// Creates the table that will contain the real-time info for each
// and every domain.
Set<DomainContext> tmpSet = new HashSet<DomainContext>();
- String missingDomains = "";
- for (Iterator<ReplicationServerDomain> iter = rs.getDomainIterator();
- iter.hasNext();)
+ final StringBuilder missingDomains = new StringBuilder();
+ for (ReplicationServerDomain rsd : toIterable(rs.getDomainIterator()))
{
- ReplicationServerDomain rsd = iter.next();
-
// skip the 'unreal' changelog domain
if (rsd == this.replicationServerDomain)
continue;
@@ -704,11 +717,12 @@
}
// skip unused domains
- if (rsd.getLatestServerState().isEmpty())
+ final ServerState latestServerState = rsd.getLatestServerState();
+ if (latestServerState.isEmpty())
continue;
// Creates the new domain context
- DomainContext newDomainCtxt = new DomainContext();
+ final DomainContext newDomainCtxt = new DomainContext();
newDomainCtxt.active = true;
newDomainCtxt.rsd = rsd;
newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
@@ -716,7 +730,7 @@
// Assign the start state for the domain
if (isPersistent == PERSISTENT_CHANGES_ONLY)
{
- newDomainCtxt.startState = rsd.getLatestServerState().duplicate();
+ newDomainCtxt.startState = latestServerState;
startStatesFromProvidedCookie.remove(rsd.getBaseDN());
}
else
@@ -746,7 +760,7 @@
// when there is a cookie provided in the request,
if (newDomainCtxt.startState == null)
{
- missingDomains += (rsd.getBaseDN() + ":;");
+ missingDomains.append(rsd.getBaseDN()).append(":;");
continue;
}
else if (!newDomainCtxt.startState.isEmpty())
@@ -760,7 +774,7 @@
}
}
- newDomainCtxt.stopState = rsd.getLatestServerState().duplicate();
+ newDomainCtxt.stopState = latestServerState;
}
newDomainCtxt.currentState = new ServerState();
@@ -799,13 +813,12 @@
- if the cookie contains a domain that is even not replicated
then this case need to be considered here in another loop.
*/
- if (!startStatesFromProvidedCookie.isEmpty())
+ if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
{
- if (allowUnknownDomains)
- for (DN providedDomain : startStatesFromProvidedCookie.keySet())
- if (rs.getReplicationServerDomain(providedDomain) == null)
- // the domain provided in the cookie is not replicated
- startStatesFromProvidedCookie.remove(providedDomain);
+ for (DN providedDomain : startStatesFromProvidedCookie.keySet())
+ if (rs.getReplicationServerDomain(providedDomain) == null)
+ // the domain provided in the cookie is not replicated
+ startStatesFromProvidedCookie.remove(providedDomain);
}
// Now do the final checking
@@ -833,7 +846,7 @@
// Initializes each and every domain with the next(first) eligible message
// from the domain.
for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.getNextEligibleMessageForDomain(operationId);
+ domainCtxt.computeNextEligibleMessageForDomain(operationId);
if (domainCtxt.nextMsg == null)
domainCtxt.active = false;
@@ -1055,53 +1068,6 @@
closeInitPhase();
}
- /* TODO: From replication CSN
- //--
- if (startCLMsg.getStartMode()==2)
- {
- if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN()))
- return;
- }
-
- //--
- if (startCLMsg.getStartMode()==4)
- {
- // to get the CL first and last
- initializeCLDomCtxts(null); // from start
- CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN();
-
- try
- {
- // to get the CL first and last
- // last rely on the crossDomainEligibleCSN thus must have been
- // computed before
- int[] limits = computeCLLimits(crossDomainEligibleCSN);
- // Send the response
- CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
- session.publish(msg);
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- try
- {
- session.publish(
- new ErrorMsg(
- replicationServer.getServerId(),
- serverId,
- Message.raw(Category.SYNC, Severity.INFORMATION,
- "Exception raised: " + e.getMessage())));
- }
- catch(IOException ioe)
- {
- // FIXME: close conn ?
- }
- }
- return;
- }
- */
-
- // Store into domain
registerIntoDomain();
if (debugEnabled())
@@ -1116,7 +1082,7 @@
short requestType = msg.getECLRequestType();
if (requestType == REQUEST_TYPE_FROM_COOKIE)
{
- initializeCLSearchFromGenState(msg.getCrossDomainServerState());
+ initializeCLSearchFromCookie(msg.getCrossDomainServerState());
}
else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER)
{
@@ -1231,12 +1197,7 @@
}
// Build the ECLUpdateMsg to be returned
- final ECLUpdateMsg change = new ECLUpdateMsg(
- (LDAPUpdateMsg) oldestContext.nextMsg,
- null, // cookie will be set later
- oldestContext.rsd.getBaseDN(),
- 0); // changeNumber may be set later
- oldestContext.nextMsg = null;
+ final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
// Default is not to loop, with one exception
continueLooping = false;
@@ -1250,8 +1211,7 @@
// Set and test the domain of the oldestChange see if we reached
// the end of the phase for this domain
- oldestContext.currentState.update(
- change.getUpdateMsg().getCSN());
+ oldestContext.currentState.update(change.getUpdateMsg().getCSN());
if (oldestContext.currentState.cover(oldestContext.stopState)
|| (draftCompat
@@ -1264,7 +1224,7 @@
{
// populates the table with the next eligible msg from iDom
// in non blocking mode, return null when no more eligible msg
- oldestContext.getNextEligibleMessageForDomain(operationId);
+ oldestContext.computeNextEligibleMessageForDomain(operationId);
}
oldestChange = change;
}
@@ -1277,21 +1237,14 @@
+ "looking for the generalized oldest change"));
for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.getNextEligibleMessageForDomain(operationId);
+ domainCtxt.computeNextEligibleMessageForDomain(operationId);
}
DomainContext oldestContext = findOldestChangeFromDomainCtxts();
if (oldestContext != null)
{
- final ECLUpdateMsg change = new ECLUpdateMsg(
- (LDAPUpdateMsg) oldestContext.nextMsg,
- null, // set later
- oldestContext.rsd.getBaseDN(),
- 0);
- oldestContext.nextMsg = null; // clean
-
- oldestContext.currentState.update(
- change.getUpdateMsg().getCSN());
+ final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
+ oldestContext.currentState.update(change.getUpdateMsg().getCSN());
if (draftCompat)
{
@@ -1330,6 +1283,15 @@
return oldestChange;
}
+ private ECLUpdateMsg newECLUpdateMsg(DomainContext ctx)
+ {
+ // cookie will be set later AND changeNumber may be set later
+ final ECLUpdateMsg change = new ECLUpdateMsg(
+ (LDAPUpdateMsg) ctx.nextMsg, null, ctx.rsd.getBaseDN(), 0);
+ ctx.nextMsg = null; // clean after use
+ return change;
+ }
+
/**
* Either retrieves a change number from the DB, or assign a new change number
* and store in the DB.
@@ -1346,13 +1308,11 @@
private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
throws ChangelogException
{
- // We also need to check if the CNIndexDB is consistent with
- // the changelogdb.
- // if not, 2 potential reasons
- // a/ : changelog has been purged (trim)let's traverse the CNIndexDB
- // b/ : changelog is late .. let's traverse the changelogDb
- // The following loop allows to loop until being on the same cn
- // in the 2 dbs
+ // We also need to check if the CNIndexDB is consistent with the
+ // changelogDB. If not, 2 potential reasons:
+ // a/ changelog has been purged (trim) let's traverse the CNIndexDB
+ // b/ changelog is late ... let's traverse the changelogDb
+ // The following loop allows to loop until being on the same cn in the 2 dbs
// replogCSN : the oldest change from the changelog db
CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
@@ -1374,20 +1334,17 @@
final DN dnFromCNIndexDB = currentRecord.getBaseDN();
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() generating change number "
- + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
- + csnFromChangelogDb + " timestamps:"
- + new Date(csnFromChangelogDb.getTime()) + " ?older"
- + new Date(csnFromCNIndexDB.getTime()));
-
+ TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
+ + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
+ + asDate(csnFromChangelogDb) + " ?older"
+ + asDate(csnFromCNIndexDB));
if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
csnFromCNIndexDB, dnFromCNIndexDB))
{
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() generating change number "
- + " assigning changeNumber=" + currentRecord.getChangeNumber()
- + " to change=" + oldestChange);
+ TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
+ + currentRecord.getChangeNumber() + " to change=" + oldestChange);
oldestChange.setChangeNumber(currentRecord.getChangeNumber());
return true;
@@ -1398,11 +1355,11 @@
{
// the change from the changelogDb is older
// it should have been stored lately
- // let's continue to traverse the changelogdb
+ // let's continue to traverse the changelogDB
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber(): will skip "
+ TRACER.debugInfo("assignChangeNumber() will skip "
+ csnFromChangelogDb
- + " and read next from the regular changelog.");
+ + " and read next change from the regular changelog.");
return false; // TO BE CHECKED
}
@@ -1413,18 +1370,17 @@
try
{
// let's traverse the CNIndexDB searching for the change
- // found in the changelogDb.
+ // found in the changelogDB
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() generating change number "
- + " will skip " + csnFromCNIndexDB
+ TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
+ " and read next change from the CNIndexDB.");
isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() generating change number has"
- + "skipped to changeNumber=" + currentRecord.getChangeNumber()
- + " csn=" + currentRecord.getCSN() + " End of CNIndexDB ?"
+ TRACER.debugInfo("assignChangeNumber() has skipped to changeNumber="
+ + currentRecord.getChangeNumber() + " csn="
+ + currentRecord.getCSN() + " End of CNIndexDB ?"
+ isEndOfCNIndexDBReached);
}
catch (ChangelogException e)
@@ -1439,6 +1395,11 @@
}
}
+ private Date asDate(CSN csn)
+ {
+ return new Date(csn.getTime());
+ }
+
private boolean areSameChange(CSN csn1, DN dn1, CSN csn2, DN dn2)
{
boolean sameDN = dn1.compareTo(dn2) == 0;
--
Gitblit v1.10.0