From 4fe72a4bef946169b0f50bc05bd9dc3b4b1131d3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 14 Aug 2009 12:37:19 +0000
Subject: [PATCH] Support for External change log compatible with draft-good-ldap-changelog-04.txt , March 2003
---
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 1510 +++++++++++++++++++++++++++++----------------------------
1 files changed, 761 insertions(+), 749 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index d0565d6..5272f50 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,12 +27,14 @@
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -41,7 +43,6 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
@@ -52,7 +53,7 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
-import org.opends.server.util.TimeThread;
+import org.opends.server.util.ServerConstants;
/**
* This class defines a server handler, which handles all interaction with a
@@ -61,128 +62,234 @@
public class ECLServerHandler extends ServerHandler
{
- // Properties filled only if remote server is a RS
- private String serverAddressURL;
-
+ // This is a string identifying the operation, provided by the client part
+ // of the ECL, used to help interpretation of messages logged.
String operationId;
+ // Iterator on the draftCN database.
+ private DraftCNDbIterator draftCNDbIter = null;
+
+ boolean draftCompat = false;
/**
- * CLDomainContext : contains the state properties for the search
- * currently being processed, by replication domain.
+ * Specifies the last draft changer number (seqnum) requested.
*/
- private class CLDomainContext
+ public int lastDraftCN = 0;
+ /**
+ * Specifies whether the draft change number (seqnum) db has been read until
+ * its end.
+ */
+ public boolean isEndOfDraftCNReached = false;
+ /**
+ * Specifies whether the current search has been requested to be persistent
+ * or not.
+ */
+ public short isPersistent;
+ /**
+ * Specifies the current search phase : INIT or PERSISTENT.
+ */
+ public int searchPhase = INIT_PHASE;
+ /**
+ * Specifies the cookie contained in the request, specifying where
+ * to start serving the ECL.
+ */
+ public String startCookie;
+ /**
+ * Specifies the value of the cookie before the change currently processed
+ * is returned. It is updated with the change number of the change
+ * currently processed (thus becoming the "current" cookie just
+ * before the change is returned.
+ */
+ public MultiDomainServerState previousCookie =
+ new MultiDomainServerState();
+ /**
+ * Specifies the excluded DNs (like cn=admin, ...).
+ */
+ public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
+
+ /**
+ * Eligible changeNumber - only changes older or equal to eligibleCN
+ * are published in the ECL.
+ */
+ public ChangeNumber eligibleCN = null;
+
+ /**
+ * Provides a string representation of this object.
+ * @return the string representation.
+ */
+ public String dumpState()
{
- ReplicationServerDomain rsd; // the repl server domain
- boolean active; // is the domain still active
- MessageHandler mh; // the message handler associated
- UpdateMsg nextMsg;
- UpdateMsg nonElligiblemsg;
+ return new String(
+ this.getClass().getCanonicalName() +
+ "[" +
+ "[draftCompat=" + draftCompat +
+ "] [persistent=" + isPersistent +
+ "] [lastDraftCN=" + lastDraftCN +
+ "] [isEndOfDraftCNReached=" + isEndOfDraftCNReached +
+ "] [searchPhase=" + searchPhase +
+ "] [startCookie=" + startCookie +
+ "] [previousCookie=" + previousCookie +
+ "]]");
+ }
+
+ /**
+ * Class that manages the 'by domain' state variables for the search being
+ * currently processed on the ECL.
+ * For example :
+ * if search on 'cn=changelog' is being processed when 2 replicated domains
+ * dc=us and dc=europe are configured, then there will be 2 DomainContext
+ * used, one for ds=us, and one for dc=europe.
+ */
+ private class DomainContext
+ {
+ ReplicationServerDomain rsd;
+
+ boolean active; // active when there are still changes
+ // supposed eligible for the ECL.
+
+ MessageHandler mh; // the message handler from which are read
+ // the changes for this domain
+ private UpdateMsg nextMsg;
+ private UpdateMsg nextNonEligibleMsg;
ServerState startState;
ServerState currentState;
ServerState stopState;
/**
- * Add to the provider buffer a string representation of this object.
+ * {@inheritDoc}
*/
- public void toString(StringBuilder buffer, int i)
+ @Override
+ public String toString()
{
- CLDomainContext xx = clDomCtxts[i];
+ StringBuilder buffer = new StringBuilder();
+ toString(buffer);
+ return buffer.toString();
+ }
+ /**
+ * Provide a string representation of this object for debug purpose..
+ */
+ public void toString(StringBuilder buffer)
+ {
buffer.append(
- " clDomCtxts(" + i + ") [act=" + xx.active +
- " rsd=" + rsd +
- " nextMsg=" + nextMsg + "(" +
+ "[ [active=" + active +
+ "] [rsd=" + rsd +
+ "] [nextMsg=" + nextMsg + "(" +
(nextMsg != null?
new Date(nextMsg.getChangeNumber().getTime()).toString():"")
+ ")" +
- " nextNonEligibleMsg=" + nonElligiblemsg +
- " startState=" + startState +
- " stopState= " + stopState +
- " currState= " + currentState + "]");
+ "] [nextNonEligibleMsg=" + nextNonEligibleMsg +
+ "] [startState=" + startState +
+ "] [stopState= " + stopState +
+ "] [currentState= " + currentState + "]]");
+ }
+
+ /**
+ * Get the next message elligible regarding
+ * the crossDomain elligible CN. Put it in the context table.
+ * @param opid The operation id.
+ */
+ private void getNextEligibleMessageForDomain(String opid)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
+ " getNextEligibleMessageForDomain(" + opid+ ") "
+ + "ctxt=" + toString());
+
+ assert(nextMsg == null);
+ try
+ {
+ // Before get a new message from the domain, evaluate in priority
+ // a message that has not been published to the ECL because it was
+ // not eligible
+ if (nextNonEligibleMsg != null)
+ {
+ boolean hasBecomeEligible =
+ (nextNonEligibleMsg.getChangeNumber().getTime()
+ <= eligibleCN.getTime());
+
+ if (debugEnabled())
+ TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
+ " getNextEligibleMessageForDomain(" + opid+ ") "
+ + " stored nonEligibleMsg " + nextNonEligibleMsg
+ + " has now become eligible regarding "
+ + " the eligibleCN ("+ eligibleCN
+ + " ):" + hasBecomeEligible);
+
+ if (hasBecomeEligible)
+ {
+ // it is now elligible
+ nextMsg = nextNonEligibleMsg;
+ nextNonEligibleMsg = null;
+ }
+ else
+ {
+ // the oldest is still not elligible - let's wait next
+ }
+ }
+ else
+ {
+ // Here comes a new message !!!
+ // non blocking
+ UpdateMsg newMsg = mh.getnextMessage(false);
+
+ if (debugEnabled())
+ TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
+ " getNextEligibleMessageForDomain(" + opid+ ") "
+ + " got new message : "
+ + " serviceId=[" + mh.getServiceId()
+ + "] [newMsg=" + newMsg + "]" + dumpState());
+
+ // in non blocking mode, return null when no more msg
+ if (newMsg != null)
+ {
+ boolean isEligible = (newMsg.getChangeNumber().getTime()
+ <= eligibleCN.getTime());
+
+ if (debugEnabled())
+ TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId()
+ + " getNextEligibleMessageForDomain(" + opid+ ") "
+ + "newMsg isEligible=" + isEligible + " since "
+ + "newMsg=[" + newMsg.getChangeNumber()
+ + " " + new Date(newMsg.getChangeNumber().getTime()).toString()
+ + "] eligibleCN=[" + eligibleCN
+ + " " + new Date(eligibleCN.getTime()).toString()+"]"
+ + dumpState());
+
+ if (isEligible)
+ {
+ nextMsg = newMsg;
+ }
+ else
+ {
+ nextNonEligibleMsg = newMsg;
+ }
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
}
- // The list of contexts by domain for the current search
- CLDomainContext[] clDomCtxts = new CLDomainContext[0];
+ // The global list of contexts by domain for the search currently processed.
+ DomainContext[] domainCtxts = new DomainContext[0];
- private void clDomCtxtsToString(String msg)
+ private String clDomCtxtsToString(String msg)
{
StringBuilder buffer = new StringBuilder();
buffer.append(msg+"\n");
- for (int i=0;i<clDomCtxts.length;i++)
+ for (int i=0;i<domainCtxts.length;i++)
{
- clDomCtxts[i].toString(buffer, i);
+ domainCtxts[i].toString(buffer);
buffer.append("\n");
}
- TRACER.debugInfo(
- "In " + this.getName() + " clDomCtxts: " + buffer.toString());
+ return buffer.toString();
}
- /**
- * Class that manages the state variables for the current search on the ECL.
- */
- private class CLTraverseCtxt
- {
- /**
- * Specifies the next changer number (seqnum), -1 when not.
- */
- public int nextSeqnum;
- /**
- * Specifies whether the current search has been requested to be persistent
- * or not.
- */
- public short isPersistent;
- /**
- * Specifies the last changer number (seqnum) requested.
- */
- public int stopSeqnum;
- /**
- * Specifies whether the change number (seqnum) db has been read until
- * its end.
- */
- public boolean endOfSeqnumdbReached = false;
- /**
- * Specifies the current search phase.
- * 1 = init
- * 2 = persistent
- */
- public int searchPhase = 1;
- /**
- * Specifies the cookie contained in the request, specifying where
- * to start serving the ECL.
- */
- public String generalizedStartState;
- /**
- * Specifies the current cookie value.
- */
- public MultiDomainServerState currentCookie =
- new MultiDomainServerState();
- /**
- * Specifies the excluded DNs.
- */
- public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
-
- /**
- * Provides a string representation of this object.
- * @return the string representation.
- */
- public String toString()
- {
- return new String(
- this.getClass().getCanonicalName() +
- ":[" +
- " nextSeqnum=" + nextSeqnum +
- " persistent=" + isPersistent +
- " stopSeqnum" + stopSeqnum +
- " endOfSeqnumdbReached=" + endOfSeqnumdbReached +
- " searchPhase=" + searchPhase +
- " generalizedStartState=" + generalizedStartState +
- "]");
- }
-
- }
-
- // The context of the current search
- private CLTraverseCtxt cLSearchCtxt = new CLTraverseCtxt();
+ static int UNDEFINED_PHASE = 0;
+ static int INIT_PHASE = 1;
+ static int PERSISTENT_PHASE = 2;
/**
* Starts this handler based on a start message received from remote server.
@@ -199,10 +306,6 @@
inECLStartMsg.getVersion());
generationId = inECLStartMsg.getGenerationId();
serverURL = inECLStartMsg.getServerURL();
- int separator = serverURL.lastIndexOf(':');
- serverAddressURL =
- session.getRemoteAddress() + ":" + serverURL.substring(separator +
- 1);
setInitialServerState(inECLStartMsg.getServerState());
setSendWindowSize(inECLStartMsg.getWindowSize());
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -211,8 +314,6 @@
// Only V2 protocol has the group id in repl server start message
this.groupId = inECLStartMsg.getGroupId();
}
- // FIXME:ECL Any generationID must be removed, it makes no sense here.
- oldGenerationId = -100;
}
catch(Exception e)
{
@@ -243,7 +344,7 @@
replicationServer, rcvWindowSize);
try
{
- setServiceIdAndDomain("cn=changelog");
+ setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
}
catch(DirectoryException de)
{
@@ -268,12 +369,12 @@
StartECLSessionMsg startECLSessionMsg)
throws DirectoryException
{
- // FIXME:ECL queueSize is hard coded to 1 else Handler hangs for some reason
+ // queueSize is hard coded to 1 else super class hangs for some reason
super(null, 1, replicationServerURL, replicationServerId,
replicationServer, 0);
try
{
- setServiceIdAndDomain("cn=changelog");
+ setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
}
catch(DirectoryException de)
{
@@ -369,115 +470,153 @@
/**
* Initialize the handler from a provided cookie value.
- * @param providedGeneralizedStartState The provided cookie value.
+ * @param crossDomainStartState The provided cookie value.
* @throws DirectoryException When an error is raised.
*/
- public void initializeCLSearchFromGenState(
- String providedGeneralizedStartState)
+ public void initializeCLSearchFromGenState(String crossDomainStartState)
throws DirectoryException
{
- this.cLSearchCtxt.nextSeqnum = -1; // will not generate seqnum
- initializeCLDomCtxts(providedGeneralizedStartState);
+ initializeCLDomCtxts(crossDomainStartState);
+ }
+
+ /**
+ * Initialize the handler from a provided draft first change number.
+ * @param startDraftCN The provided draft first change number.
+ * @throws DirectoryException When an error is raised.
+ */
+ public void initializeCLSearchFromDraftCN(int startDraftCN)
+ throws DirectoryException
+ {
+ String crossDomainStartState;
+
+ draftCompat = true;
+
+ DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler();
+ if (startDraftCN < 0)
+ {
+ // Request filter does not contain any firstDraftCN
+ // So we'll generate from the beginning of what we have stored here.
+
+ // Get the first DraftCN from DraftCNdb
+ if (draftCNDb.count() == 0)
+ {
+ // db is empty
+ isEndOfDraftCNReached = true;
+ crossDomainStartState = null;
+ }
+ else
+ {
+ // get the generalizedServerState related to the start of the draftDb
+ crossDomainStartState = draftCNDb.getValue(draftCNDb.getFirstKey());
+
+ // Get an iterator to traverse the draftCNDb
+ try
+ {
+ draftCNDbIter =
+ draftCNDb.generateIterator(draftCNDb.getFirstKey());
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+
+ if (draftCNDbIter != null)
+ draftCNDbIter.releaseCursor();
+
+ throw new DirectoryException(
+ ResultCode.OPERATIONS_ERROR,
+ Message.raw(Category.SYNC,
+ Severity.FATAL_ERROR,"Server Error."));
+ }
+ }
+ }
+ else
+ {
+ // Request filter does contain a startDraftCN
+
+ // Read the draftCNDb to see whether it contains startDraftCN
+ crossDomainStartState = draftCNDb.getValue(startDraftCN);
+
+ if (crossDomainStartState != null)
+ {
+ // startDraftCN is present in the draftCnDb
+ // Get an iterator to traverse the draftCNDb
+ try
+ {
+ draftCNDbIter =
+ draftCNDb.generateIterator(draftCNDb.getFirstKey());
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+
+ if (draftCNDbIter != null)
+ draftCNDbIter.releaseCursor();
+
+ throw new DirectoryException(
+ ResultCode.OPERATIONS_ERROR,
+ Message.raw(Category.SYNC,
+ Severity.FATAL_ERROR,"Server Error."));
+ }
+ }
+ else
+ {
+ // startDraftCN provided in the request is not present in the draftCnDb
+ // Is the provided startDraftCN <= the potential last DraftCNdb
+
+ // Get the draftLimits (from the eligibleCN got at the beginning of
+ // the operation.
+ int[] limits = getECLDraftCNLimits(eligibleCN);
+
+ if (startDraftCN<=limits[1])
+ {
+ // startDraftCN is between first and last and has never been
+ // returned yet
+ crossDomainStartState = draftCNDb.getValue(draftCNDb.getLastKey());
+ // FIXME:ECL ... ok we'll start from the end of the draftCNDb BUT ...
+ // this is NOT the request of the client !!!!
+ }
+ else
+ {
+ throw new DirectoryException(
+ ResultCode.SUCCESS,
+ Message.raw(Category.SYNC,
+ Severity.INFORMATION,"Bad value provided for change number "
+ + " Failed to match a replication state to "+startDraftCN));
+ }
+ }
+ }
+ this.draftCompat = true;
+
+ initializeCLDomCtxts(crossDomainStartState);
+
}
/**
* Initialize the context for each domain.
- * @param providedGeneralizedStartState the provided generalized state
+ * @param providedCookie the provided generalized state
* @throws DirectoryException When an error occurs.
*/
- public void initializeCLDomCtxts(String providedGeneralizedStartState)
+ public void initializeCLDomCtxts(String providedCookie)
throws DirectoryException
{
HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
ReplicationServer rs = replicationServerDomain.getReplicationServer();
- try
- {
- // Initialize start state for all running domains with empty state
- Iterator<ReplicationServerDomain> rsdk = rs.getCacheIterator();
- if (rsdk != null)
- {
- while (rsdk.hasNext())
- {
- // process a domain
- ReplicationServerDomain rsd = rsdk.next();
- // skip the changelog domain
- if (rsd == this.replicationServerDomain)
- continue;
- startStates.put(rsd.getBaseDn(), new ServerState());
- }
- }
-
- // Overwrite start state from the cookie provided in the request
- if ((providedGeneralizedStartState != null) &&
- (providedGeneralizedStartState.length()>0))
- {
- String[] domains = providedGeneralizedStartState.split(";");
- for (String domainState : domains)
- {
- // Split baseDN and serverState
- String[] fields = domainState.split(":");
-
- // BaseDN - Check it
- String domainBaseDNReceived = fields[0];
- if (!startStates.containsKey(domainBaseDNReceived))
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
- "unknown " + domainBaseDNReceived));
-
- // ServerState
- ServerState domainServerState = new ServerState();
- if (fields.length>1)
- {
- String strState = fields[1];
- String[] strCN = strState.split(" ");
- for (String sr : strCN)
- {
- ChangeNumber fromChangeNumber = new ChangeNumber(sr);
- domainServerState.update(fromChangeNumber);
- }
- }
- startStates.put(domainBaseDNReceived, domainServerState);
-
- // FIXME: ECL first cookie value check
- // ECL For each of the provided state, it this state is older
- // than the older change stored in the replication changelog ....
- // then a purge occured since the time the cookie was published
- // it is recommended to do a full resync
- ReplicationServerDomain rsd =
- rs.getReplicationServerDomain(domainBaseDNReceived, false);
- ServerState domainStartState = rsd.getStartState();
- if (!domainServerState.cover(domainStartState))
- {
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
- "too old cookie provided " + providedGeneralizedStartState
- + " first acceptable change for " + rsd.getBaseDn()
- + " is " + rsd.getStartState()));
- }
- }
- }
- }
- catch(DirectoryException de)
- {
- throw de;
- }
- catch(Exception e)
- {
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
- "Exception raised: " + e.getMessage()));
- }
+ // Parse the provided cookie and overwrite startState from it.
+ if ((providedCookie != null) && (providedCookie.length()!=0))
+ startStates =
+ MultiDomainServerState.splitGenStateToServerStates(providedCookie);
try
{
- // Now traverse all domains and build the initial changelog context
- Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator();
+ // Now traverse all domains and build all the initial contexts :
+ // - the global one : dumpState()
+ // - the domain by domain ones : domainCtxts
+ Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
// Creates the table that will contain the real-time info by domain.
- clDomCtxts = new CLDomainContext[rs.getCacheSize()-1
- -this.cLSearchCtxt.excludedServiceIDs.size()];
+ HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
int i =0;
if (rsdi != null)
{
@@ -491,73 +630,87 @@
continue;
// skip the excluded domains
- boolean excluded = false;
- for(String excludedServiceID : this.cLSearchCtxt.excludedServiceIDs)
- {
- if (excludedServiceID.equalsIgnoreCase(rsd.getBaseDn()))
- {
- excluded=true;
- break;
- }
- }
- if (excluded)
+ if (isServiceIDExcluded(rsd.getBaseDn()))
continue;
- // Creates the context record
- CLDomainContext newContext = new CLDomainContext();
- newContext.active = true;
- newContext.rsd = rsd;
+ // Creates the new domain context
+ DomainContext newDomainCtxt = new DomainContext();
+ newDomainCtxt.active = true;
+ newDomainCtxt.rsd = rsd;
- if (this.cLSearchCtxt.isPersistent ==
+ // Assign the start state for the domain
+ if (isPersistent ==
StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
{
- newContext.startState = rsd.getCLElligibleState();
+ newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
}
else
{
- newContext.startState = startStates.get(rsd.getBaseDn());
- newContext.stopState = rsd.getCLElligibleState();
+ newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
+ if ((providedCookie==null)||(providedCookie.isEmpty()))
+ newDomainCtxt.startState = new ServerState();
+ else
+ if (newDomainCtxt.startState == null)
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
+ "missing " + rsd.getBaseDn()));
+ newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
}
- newContext.currentState = new ServerState();
+ newDomainCtxt.currentState = new ServerState();
- // Creates an unconnected SH
+ // Creates an unconnected SH for the domain
MessageHandler mh = new MessageHandler(maxQueueSize,
replicationServerURL, replicationServerId, replicationServer);
// set initial state
- mh.setInitialServerState(newContext.startState);
+ mh.setInitialServerState(newDomainCtxt.startState);
// set serviceID and domain
mh.setServiceIdAndDomain(rsd.getBaseDn());
- // register into domain
+ // register the unconnected into the domain
rsd.registerHandler(mh);
- newContext.mh = mh;
+ newDomainCtxt.mh = mh;
+
+ previousCookie.update(
+ newDomainCtxt.rsd.getBaseDn(),
+ newDomainCtxt.startState);
// store the new context
- clDomCtxts[i] = newContext;
+ tmpSet.add(newDomainCtxt);
i++;
}
}
-
- // the next record from the seqnumdb should be the one
- cLSearchCtxt.endOfSeqnumdbReached = false;
- cLSearchCtxt.generalizedStartState = providedGeneralizedStartState;
-
- // Initializes all domain with the next elligible message
- for (int j=0; j<clDomCtxts.length; j++)
+ if (!startStates.isEmpty())
{
- this.getNextElligibleMessage(j);
- if (clDomCtxts[j].nextMsg == null)
- clDomCtxts[j].active = false;
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
+ "unknown " + startStates.toString()));
+ }
+ domainCtxts = tmpSet.toArray(new DomainContext[0]);
+
+ // the next record from the DraftCNdb should be the one
+ startCookie = providedCookie;
+
+ // Initializes all domain with the next(first) elligible message
+ for (int j=0; j<domainCtxts.length; j++)
+ {
+ domainCtxts[j].getNextEligibleMessageForDomain(operationId);
+
+ if (domainCtxts[j].nextMsg == null)
+ domainCtxts[j].active = false;
}
}
catch(Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ // FIXME:ECL do not publish internal exception plumb to the client
throw new DirectoryException(
ResultCode.OPERATIONS_ERROR,
Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " +
- e.getLocalizedMessage()),
- e);
+ e),
+ e);
}
+ if (debugEnabled())
+ TRACER.debugInfo(
+ " initializeCLDomCtxts ends with " + " " + dumpState());
}
/**
@@ -569,21 +722,22 @@
}
/**
- * Shutdown this handler ServerHandler.
+ * Shutdown this handler.
*/
public void shutdown()
{
- for (int i=0;i<clDomCtxts.length;i++)
+ for (int i=0;i<domainCtxts.length;i++)
{
- if (!clDomCtxts[i].rsd.unRegisterHandler(clDomCtxts[i].mh))
+ if (!domainCtxts[i].rsd.unRegisterHandler(domainCtxts[i].mh))
{
- TRACER.debugInfo(this +" shutdown() Internal error " +
- " when unregistering "+ clDomCtxts[i].mh);
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ this +" shutdown() - error when unregistering handler "
+ + domainCtxts[i].mh));
}
- clDomCtxts[i].rsd.stopServer(clDomCtxts[i].mh);
+ domainCtxts[i].rsd.stopServer(domainCtxts[i].mh);
}
super.shutdown();
- clDomCtxts = null;
+ domainCtxts = null;
}
/**
@@ -629,7 +783,7 @@
attributes.add(Attributes.create("External-Changelog-Server",
serverURL));
- // FIXME:ECL No monitoring exist for ECL.
+ // TODO:ECL No monitoring exist for ECL.
return attributes;
}
/**
@@ -640,8 +794,9 @@
{
String localString;
localString = "External changelog Server ";
- if (this.cLSearchCtxt==null)
- localString += serverId + " " + serverURL + " " + getServiceId();
+ if (this.serverId != 0)
+ localString += serverId + " " + serverURL + " " + getServiceId()
+ + " " + this.getOperationId();
else
localString += this.getName();
return localString;
@@ -652,18 +807,9 @@
*/
public ServerStatus getStatus()
{
- // FIXME:ECL Sould ECLServerHandler manage a ServerStatus ?
- return ServerStatus.INVALID_STATUS;
- }
- /**
- * Retrieves the Address URL for this server handler.
- *
- * @return The Address URL for this server handler,
- * in the form of an IP address and port separated by a colon.
- */
- public String getServerAddressURL()
- {
- return serverAddressURL;
+ // There is no other status possible for the ECL Server Handler to
+ // be normally connected.
+ return ServerStatus.NORMAL_STATUS;
}
/**
* {@inheritDoc}
@@ -684,29 +830,34 @@
{
//
- this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
- this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
- this.setConsumerActive(true);
- this.cLSearchCtxt.searchPhase = 1;
+ //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
+ //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
+ //this.setConsumerActive(true);
+
this.operationId = startECLSessionMsg.getOperationId();
this.setName(this.getClass().getCanonicalName()+ " " + operationId);
- if (eligibleCNComputerThread==null)
- eligibleCNComputerThread = new ECLEligibleCNComputerThread();
-
- cLSearchCtxt.isPersistent = startECLSessionMsg.isPersistent();
- cLSearchCtxt.stopSeqnum = startECLSessionMsg.getLastDraftChangeNumber();
- cLSearchCtxt.searchPhase = 1;
- cLSearchCtxt.currentCookie = new MultiDomainServerState(
+ isPersistent = startECLSessionMsg.isPersistent();
+ lastDraftCN = startECLSessionMsg.getLastDraftChangeNumber();
+ searchPhase = INIT_PHASE;
+ previousCookie = new MultiDomainServerState(
startECLSessionMsg.getCrossDomainServerState());
- cLSearchCtxt.excludedServiceIDs=startECLSessionMsg.getExcludedServiceIDs();
+ excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
+ replicationServer.disableEligibility(excludedServiceIDs);
+ eligibleCN = replicationServer.getEligibleCN();
- //--
- if (startECLSessionMsg.getECLRequestType()==0)
+ if (startECLSessionMsg.getECLRequestType()==
+ StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE)
{
initializeCLSearchFromGenState(
startECLSessionMsg.getCrossDomainServerState());
}
+ else if (startECLSessionMsg.getECLRequestType()==
+ StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER)
+ {
+ initializeCLSearchFromDraftCN(
+ startECLSessionMsg.getFirstDraftChangeNumber());
+ }
if (session != null)
{
@@ -735,21 +886,15 @@
// Resume the writer
((ECLServerWriter)writer).resumeWriter();
- // FIXME:ECL Potential race condition if writer not yet resumed here
+ // TODO:ECL Potential race condition if writer not yet resumed here
}
- if (cLSearchCtxt.isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
+ if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
{
- closePhase1();
+ closeInitPhase();
}
- /* TODO: Good Draft Compat
- //--
- if (startCLMsg.getStartMode()==1)
- {
- initializeCLSearchFromProvidedSeqnum(startCLMsg.getSequenceNumber());
- }
-
+ /* TODO: From replication changenumber
//--
if (startCLMsg.getStartMode()==2)
{
@@ -762,14 +907,14 @@
{
// to get the CL first and last
initializeCLDomCtxts(null); // from start
- ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN();
+ ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN();
try
{
// to get the CL first and last
- // last rely on the crossDomainElligibleCN thhus must have been
+ // last rely on the crossDomainEligibleCN thhus must have been
// computed before
- int[] limits = computeCLLimits(crossDomainElligibleCN);
+ int[] limits = computeCLLimits(crossDomainEligibleCN);
// Send the response
CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
session.publish(msg);
@@ -793,11 +938,17 @@
}
return;
}
- Good Draft Compat */
+ */
// Store into domain
registerIntoDomain();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ this.getName() + " initialized: " +
+ " " + dumpState() + " " +
+ " " + clDomCtxtsToString(""));
+
}
/**
@@ -812,30 +963,12 @@
throws DirectoryException
{
boolean interrupted = true;
- ECLUpdateMsg msg = getnextUpdate();
+ ECLUpdateMsg msg = getNextECLUpdate();
- // FIXME:ECL We should refactor so that a SH always have a session
+ // TODO:ECL We should refactor so that a SH always have a session
if (session == null)
return msg;
- /*
- * When we remove a message from the queue we need to check if another
- * server is waiting in flow control because this queue was too long.
- * This check might cause a performance penalty an therefore it
- * is not done for every message removed but only every few messages.
- */
- /** FIXME:ECL checkAllSaturation makes no sense for ECLServerHandler ?
- if (++saturationCount > 10)
- {
- saturationCount = 0;
- try
- {
- replicationServerDomain.checkAllSaturation();
- } catch (IOException e)
- {
- }
- }
- */
boolean acquired = false;
do
{
@@ -857,16 +990,17 @@
/**
* Get the next message - non blocking - null when none.
- *
- * @param synchronous - not used - always non blocking.
- * @return the next message - null when none.
+ * This method is currently not used but we don't want to keep the mother
+ * class method since it make no sense for ECLServerHandler.
+ * @param synchronous - not used
+ * @return the next message
*/
protected UpdateMsg getnextMessage(boolean synchronous)
{
UpdateMsg msg = null;
try
{
- ECLUpdateMsg eclMsg = getnextUpdate();
+ ECLUpdateMsg eclMsg = getNextECLUpdate();
if (eclMsg!=null)
msg = eclMsg.getUpdateMsg();
}
@@ -878,60 +1012,24 @@
}
/**
- * Get the next external changelog update.
- *
- * @return The ECL update, null when none.
- * @exception DirectoryException when any problem occurs.
+ * Returns the next update message for the External Changelog (ECL).
+ * @return the ECL update message, null when there aren't anymore.
+ * @throws DirectoryException when an error occurs.
*/
- protected ECLUpdateMsg getnextUpdate()
+ public ECLUpdateMsg getNextECLUpdate()
throws DirectoryException
{
- return getGeneralizedNextECLUpdate(this.cLSearchCtxt);
- }
+ ECLUpdateMsg oldestChange = null;
- /**
- * Computes the cross domain eligible message (non blocking).
- * Return null when search is covered
- */
- private ECLUpdateMsg getGeneralizedNextECLUpdate(CLTraverseCtxt cLSearchCtxt)
- throws DirectoryException
- {
- ECLUpdateMsg theOldestChange = null;
- try
- {
+ if (debugEnabled())
TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + "," + this +
- " getGeneralizedNextECLUpdate starts with ctxt="
- + cLSearchCtxt);
+ " getNextECLUpdate starts: " + dumpState());
- if ((cLSearchCtxt.nextSeqnum != -1) &&
- (!cLSearchCtxt.endOfSeqnumdbReached))
- {
- /* TODO:ECL G Good changelog draft compat.
- // First time , initialise the cursor to traverse the seqnumdb
- if (seqnumDbReadIterator == null)
- {
- try
- {
- seqnumDbReadIterator = replicationServerDomain.getReplicationServer().
- getSeqnumDbHandler().generateIterator(cLSearchCtxt.nextSeqnum);
- TRACER.debugInfo("getGeneralizedNextMessage(): "
- + " creates seqnumDbReadIterator from nextSeqnum="
- + cLSearchCtxt.nextSeqnum
- + " 1rst=" + seqnumDbReadIterator.getSeqnum()
- + " CN=" + seqnumDbReadIterator.getChangeNumber()
- + cLSearchCtxt);
- }
- catch(Exception e)
- {
- cLSearchCtxt.endOfSeqnumdbReached = true;
- }
- }
- */
- }
+ try
+ {
-
- // Search / no seqnum / not persistent
+ // Search / no DraftCN / not persistent
// -----------------------------------
// init: all domain are candidate
// get one msg from each
@@ -958,136 +1056,136 @@
// if one domain has no msg, still is candidate
int iDom = 0;
- boolean nextclchange = true;
- while ((nextclchange) && (cLSearchCtxt.searchPhase==1))
+ boolean continueLooping = true;
+ while ((continueLooping) && (searchPhase == INIT_PHASE))
{
-
// Step 1 & 2
- if (cLSearchCtxt.searchPhase==1)
+ if (searchPhase == INIT_PHASE)
{
- if (debugEnabled())
- clDomCtxtsToString("In getGeneralizedNextMessage : " +
- "looking for the generalized oldest change");
+ // Normally we whould not loop .. except ...
+ continueLooping = false;
- // Retrieves the index in the subx table of the
- // generalizedOldestChange
- iDom = getGeneralizedOldestChange();
+ iDom = getOldestChangeFromDomainCtxts();
- // idomain != -1 means that we got one
- // generalized oldest change to process
- if (iDom==-1)
+ // iDom == -1 means that there is no oldest change to process
+ if (iDom == -1)
{
- closePhase1();
+ closeInitPhase();
- // signify end of phase 1 to the caller
+ // signals end of phase 1 to the caller
return null;
}
- // idomain != -1 means that we got one
- // generalized oldest change to process
- String suffix = this.clDomCtxts[iDom].rsd.getBaseDn();
- theOldestChange = new ECLUpdateMsg(
- (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg,
- null, // set later
- suffix);
+ // Build the ECLUpdateMsg to be returned
+ oldestChange = new ECLUpdateMsg(
+ (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
+ null, // cookie will be set later
+ domainCtxts[iDom].rsd.getBaseDn(),
+ 0); // draftChangeNumber may be set later
+ domainCtxts[iDom].nextMsg = null;
- nextclchange = false;
-
- /* TODO:ECL G Good change log draft compat.
- if (cLSearchCtxt.nextSeqnum!=-1)
+ if (draftCompat)
{
- // Should either retrieve or generate a seqnum
- // we also need to check if the seqnumdb is acccurate reagrding
+ // either retrieve a draftCN from the draftCNDb
+ // or assign a new draftCN and store in the db
+
+ DraftCNDbHandler draftCNDb=replicationServer.getDraftCNDbHandler();
+
+ // We also need to check if the draftCNdb is consistent with
// the changelogdb.
// if not, 2 potential reasons
- // -1 : purge from the changelog .. let's traverse the seqnumdb
- // -2 : changelog is late .. let's traverse the changelog
+ // a/ : changelog has been purged (trim)let's traverse the draftCNDb
+ // b/ : changelog is late .. let's traverse the changelogDb
+ // The following loop allows to loop until being on the same cn
+ // in the 2 dbs
// replogcn : the oldest change from the changelog db
- ChangeNumber replogcn = theOldestChange.getChangeNumber();
- DN replogReplDomDN = clDomCtxts[iDom].rsd.getBaseDn();
+ ChangeNumber cnFromChangelogDb =
+ oldestChange.getUpdateMsg().getChangeNumber();
+ String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn();
while (true)
{
- if (!cLSearchCtxt.endOfSeqnumdbReached)
+ if (!isEndOfDraftCNReached)
{
- // we did not reach yet the end of the seqnumdb
+ // we did not reach yet the end of the DraftCNdb
- // seqnumcn : the next change from from the seqnum db
- ChangeNumber seqnumcn = seqnumDbReadIterator.getChangeNumber();
+ // the next change from the DraftCN db
+ ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber();
+ String dnFromDraftCNDb = draftCNDbIter.getServiceID();
- // are replogcn and seqnumcn should be the same change ?
- int cmp = replogcn.compareTo(seqnumcn);
- DN seqnumReplDomDN=DN.decode(seqnumDbReadIterator.
- getDomainDN());
+ // are replogcn and DraftCNcn should be the same change ?
+ int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb);
+ int areDNEqual = dnFromChangelogDb.compareTo(dnFromDraftCNDb);
- TRACER.debugInfo("seqnumgen: comparing the 2 db "
- + " changelogdb:" + replogReplDomDN + "=" + replogcn
- + " ts=" +
- new Date(replogcn.getTime()).toString()
- + "## seqnumdb:" + seqnumReplDomDN + "=" + seqnumcn
- + " ts=" +
- new Date(seqnumcn.getTime()).toString()
- + " sn older=" + seqnumcn.older(replogcn));
+ if (debugEnabled())
+ TRACER.debugInfo("getNextECLUpdate generating draftCN "
+ + " comparing the 2 db DNs :"
+ + dnFromChangelogDb + "?=" + cnFromChangelogDb
+ + " timestamps:" + new Date(cnFromChangelogDb.getTime())
+ + " ?older" + new Date(cnFromDraftCNDb.getTime()));
- if ((replogReplDomDN.compareTo(seqnumReplDomDN)==0) && (cmp==0))
+ if ((areDNEqual==0) && (areCNEqual==0))
{
// same domain and same CN => same change
- // assign the seqnum from the seqnumdb
- // to the change from the changelogdb
+ // assign the DraftCN found to the change from the changelogdb
+ if (debugEnabled())
+ TRACER.debugInfo("getNextECLUpdate generating draftCN "
+ + " assigning draftCN=" + draftCNDbIter.getDraftCN()
+ + " to change=" + oldestChange);
- TRACER.debugInfo("seqnumgen: assigning seqnum="
- + seqnumDbReadIterator.getSeqnum()
- + " to change=" + theOldestChange);
- theOldestChange.setSeqnum(seqnumDbReadIterator.getSeqnum());
+ oldestChange.setDraftChangeNumber(
+ draftCNDbIter.getDraftCN());
- // prepare the next seqnum for the potential next change added
- // to the seqnumDb
- cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum()
- + 1;
break;
}
else
{
- // replogcn and seqnumcn are NOT the same change
- if (seqnumcn.older(replogcn))
+ // replogcn and DraftCNcn are NOT on the same change
+ if (cnFromDraftCNDb.older(cnFromChangelogDb))
{
- // the change from the seqnumDb is older
+ // the change from the DraftCNDb is older
// that means that the change has been purged from the
- // changelog
+ // changelogDb (and DraftCNdb not yet been trimed)
+
try
{
- // let's traverse the seqnumdb searching for the change
+ // let's traverse the DraftCNdb searching for the change
// found in the changelogDb.
- TRACER.debugInfo("seqnumgen: will skip "
- + seqnumcn + " and next from the seqnum");
- cLSearchCtxt.endOfSeqnumdbReached =
- (seqnumDbReadIterator.next()==false);
- TRACER.debugInfo("seqnumgen: has nexted cr to "
- + " sn=" + seqnumDbReadIterator.getSeqnum()
- + " cn=" + seqnumDbReadIterator.getChangeNumber()
- + " and reached end "
- + " of seqnumdb:"+cLSearchCtxt.endOfSeqnumdbReached);
- if (cLSearchCtxt.endOfSeqnumdbReached)
+ TRACER.debugInfo("getNextECLUpdate generating draftCN "
+ + " will skip " + cnFromDraftCNDb
+ + " and read next change from the DraftCNDb.");
+
+ isEndOfDraftCNReached = (draftCNDbIter.next()==false);
+
+ TRACER.debugInfo("getNextECLUpdate generating draftCN "
+ + " has skiped to "
+ + " sn=" + draftCNDbIter.getDraftCN()
+ + " cn=" + draftCNDbIter.getChangeNumber()
+ + " End of draftCNDb ?"+isEndOfDraftCNReached);
+
+ if (isEndOfDraftCNReached)
{
- // we are at the end of the seqnumdb in the append mode
- // store in seqnumdb the pair
- // seqnum of the cur change,state before this change)
- replicationServerDomain.addSeqnum(
- cLSearchCtxt.nextSeqnum,
- getGenState(),
- clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
- theOldestChange.getChangeNumber());
- theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
- cLSearchCtxt.nextSeqnum++;
+ // we are at the end of the DraftCNdb in the append mode
+
+ // generate a new draftCN and assign to this change
+ oldestChange.setDraftChangeNumber(
+ replicationServer.getNewDraftCN());
+
+ // store in DraftCNdb the pair
+ // (draftCN_of_the_cur_change, state_before_this_change)
+ draftCNDb.add(
+ oldestChange.getDraftChangeNumber(),
+ previousCookie.toString(),
+ oldestChange.getServiceId(),
+ oldestChange.getUpdateMsg().getChangeNumber());
+
break;
}
else
{
- // next change from seqnumdb
- cLSearchCtxt.nextSeqnum =
- seqnumDbReadIterator.getSeqnum() + 1;
+ // let's go to test this new change fro the DraftCNdb
continue;
}
}
@@ -1100,108 +1198,99 @@
// the change from the changelogDb is older
// it should have been stored lately
// let's continue to traverse the changelogdb
- TRACER.debugInfo("seqnumgen: will skip "
- + replogcn + " and next from the CL");
- nextclchange = true;
+ TRACER.debugInfo("getNextECLUpdate: will skip "
+ + cnFromChangelogDb
+ + " and read next from the regular changelog.");
+ continueLooping = true;
break; // TO BE CHECKED
}
}
}
else
{
- // we are at the end of the seqnumdb in the append mode
- // store in seqnumdb the pair
- // (seqnum of the current change, state before this change)
- replicationServerDomain.addSeqnum(
- cLSearchCtxt.nextSeqnum,
- getGenState(),
- clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
- theOldestChange.getChangeNumber());
- theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
- cLSearchCtxt.nextSeqnum++;
+ // we are at the end of the DraftCNdb in the append mode
+ // store in DraftCNdb the pair
+ // (DraftCN of the current change, state before this change)
+ oldestChange.setDraftChangeNumber(
+ replicationServer.getNewDraftCN());
+
+ draftCNDb.add(
+ oldestChange.getDraftChangeNumber(),
+ this.previousCookie.toString(),
+ domainCtxts[iDom].rsd.getBaseDn(),
+ oldestChange.getUpdateMsg().getChangeNumber());
+
break;
}
- } // while seqnum
- } // nextseqnum !- -1
- */
+ } // while DraftCN
+ }
- // here we have the right oldest change and in the seqnum case we
- // have its seqnum
+ // here we have the right oldest change
+ // and in the draft case, we have its draft changenumber
// Set and test the domain of the oldestChange see if we reached
// the end of the phase for this domain
- clDomCtxts[iDom].currentState.update(
- theOldestChange.getUpdateMsg().getChangeNumber());
+ domainCtxts[iDom].currentState.update(
+ oldestChange.getUpdateMsg().getChangeNumber());
- if (clDomCtxts[iDom].currentState.cover(clDomCtxts[iDom].stopState))
+ if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState))
{
- clDomCtxts[iDom].active = false;
+ domainCtxts[iDom].active = false;
}
- // Test the seqnum of the oldestChange see if we reached
- // the end of operation
- /* TODO:ECL G Good changelog draft compat. Not yet implemented
- if ((cLSearchCtxt.stopSeqnum>0) &&
- (theOldestChange.getSeqnum()>=cLSearchCtxt.stopSeqnum))
- {
- closePhase1();
-
- // means end of phase 1 to the calling writer
- return null;
- }
- */
-
- if (clDomCtxts[iDom].active)
+ if (domainCtxts[iDom].active)
{
// populates the table with the next eligible msg from idomain
// in non blocking mode, return null when no more eligible msg
- getNextElligibleMessage(iDom);
+ domainCtxts[iDom].getNextEligibleMessageForDomain(operationId);
}
- } // phase ==1
- } // while (nextclchange)
+ } // phase == INIT_PHASE
+ } // while (...)
- if (cLSearchCtxt.searchPhase==2)
+ if (searchPhase == PERSISTENT_PHASE)
{
- clDomCtxtsToString("In getGeneralizedNextMessage (persistent): " +
- "looking for the generalized oldest change");
+ if (debugEnabled())
+ clDomCtxtsToString("In getNextECLUpdate (persistent): " +
+ "looking for the generalized oldest change");
- for (int ido=0; ido<clDomCtxts.length; ido++)
+ for (int ido=0; ido<domainCtxts.length; ido++)
{
// get next msg
- getNextElligibleMessage(ido);
+ domainCtxts[ido].getNextEligibleMessageForDomain(operationId);
}
// take the oldest one
- iDom = getGeneralizedOldestChange();
+ iDom = getOldestChangeFromDomainCtxts();
if (iDom != -1)
{
- String suffix = this.clDomCtxts[iDom].rsd.getBaseDn();
+ String suffix = this.domainCtxts[iDom].rsd.getBaseDn();
- theOldestChange = new ECLUpdateMsg(
- (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg,
+ oldestChange = new ECLUpdateMsg(
+ (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
null, // set later
- suffix);
+ suffix, 0);
+ domainCtxts[iDom].nextMsg = null; // clean
- clDomCtxts[iDom].currentState.update(
- theOldestChange.getUpdateMsg().getChangeNumber());
+ domainCtxts[iDom].currentState.update(
+ oldestChange.getUpdateMsg().getChangeNumber());
- /* TODO:ECL G Good changelog draft compat.
- if (cLSearchCtxt.nextSeqnum!=-1)
+ if (draftCompat)
{
- // should generate seqnum
+ // should generate DraftCN
+ DraftCNDbHandler draftCNDb =replicationServer.getDraftCNDbHandler();
- // store in seqnumdb the pair
- // (seqnum of the current change, state before this change)
- replicationServerDomain.addSeqnum(
- cLSearchCtxt.nextSeqnum,
- getGenState(),
- clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
- theOldestChange.getChangeNumber());
- theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
- cLSearchCtxt.nextSeqnum++;
+ oldestChange.setDraftChangeNumber(
+ replicationServer.getNewDraftCN());
+
+ // store in DraftCNdb the pair
+ // (DraftCN of the current change, state before this change)
+ draftCNDb.add(
+ oldestChange.getDraftChangeNumber(),
+ this.previousCookie.toString(),
+ domainCtxts[iDom].rsd.getBaseDn(),
+ oldestChange.getUpdateMsg().getChangeNumber());
}
- */
}
}
}
@@ -1214,39 +1303,48 @@
e);
}
- if (theOldestChange != null)
+ if (oldestChange != null)
{
+ if (debugEnabled())
+ TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
+ + oldestChange.getUpdateMsg().getChangeNumber());
+
// Update the current state
- this.cLSearchCtxt.currentCookie.update(
- theOldestChange.getServiceId(),
- theOldestChange.getUpdateMsg().getChangeNumber());
+ previousCookie.update(
+ oldestChange.getServiceId(),
+ oldestChange.getUpdateMsg().getChangeNumber());
// Set the current value of global state in the returned message
- theOldestChange.setCookie(this.cLSearchCtxt.currentCookie);
+ oldestChange.setCookie(previousCookie);
+
+ if (debugEnabled())
+ TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
+ oldestChange);
+
}
- return theOldestChange;
+ return oldestChange;
}
/**
* Terminates the first (non persistent) phase of the search on the ECL.
*/
- private void closePhase1()
+ private void closeInitPhase()
{
// starvation of changelog messages
// all domain have been unactived means are covered
if (debugEnabled())
TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + "," + this + " closePhase1()"
- + " searchCtxt=" + cLSearchCtxt);
+ getMonitorInstanceName() + "," + this + " closeInitPhase(): "
+ + dumpState());
// go to persistent phase if one
- for (int i=0; i<clDomCtxts.length; i++)
- clDomCtxts[i].active = true;
+ for (int i=0; i<domainCtxts.length; i++)
+ domainCtxts[i].active = true;
- if (this.cLSearchCtxt.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
+ if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
{
- // phase = 1 done AND persistent search => goto phase 2
- cLSearchCtxt.searchPhase=2;
+ // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
+ searchPhase = PERSISTENT_PHASE;
if (writer ==null)
{
@@ -1257,269 +1355,53 @@
}
else
{
- // phase = 1 done AND !persistent search => reinit to phase 0
- cLSearchCtxt.searchPhase=0;
+ // INIT_PHASE is done AND search is not persistent => reinit
+ searchPhase = UNDEFINED_PHASE;
}
- /* TODO:ECL G Good changelog draft compat.
- if (seqnumDbReadIterator!=null)
+ if (draftCNDbIter!=null)
{
- // End of phase 1 => always release the seqnum iterator
- seqnumDbReadIterator.releaseCursor();
- seqnumDbReadIterator = null;
+ // End of INIT_PHASE => always release the iterator
+ draftCNDbIter.releaseCursor();
+ draftCNDbIter = null;
}
- */
}
/**
- * Get the oldest change contained in the subx table.
- * The subx table should be populated before
- * @return the oldest change.
+ * Get the index in the domainCtxt table of the domain with the oldest change.
+ * @return the index of the domain with the oldest change, -1 when none.
*/
- private int getGeneralizedOldestChange()
+ private int getOldestChangeFromDomainCtxts()
{
int oldest = -1;
- for (int i=0; i<clDomCtxts.length; i++)
+ for (int i=0; i<domainCtxts.length; i++)
{
- if ((clDomCtxts[i].active))
+ if ((domainCtxts[i].active))
{
// on the first loop, oldest==-1
// .msg is null when the previous (non blocking) nextMessage did
// not have any eligible msg to return
- if (clDomCtxts[i].nextMsg != null)
+ if (domainCtxts[i].nextMsg != null)
{
if ((oldest==-1) ||
- (clDomCtxts[i].nextMsg.compareTo(clDomCtxts[oldest].nextMsg)<0))
+ (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0))
{
oldest = i;
}
}
}
}
+
if (debugEnabled())
TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName()
- + "," + this + " getGeneralizedOldestChange() " +
- " returns " +
- ((oldest!=-1)?clDomCtxts[oldest].nextMsg:""));
+ getMonitorInstanceName()
+ + "," + this + " getOldestChangeFromDomainCtxts() returns " +
+ ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
return oldest;
}
/**
- * Get from the provided domain, the next message elligible regarding
- * the crossDomain elligible CN. Put it in the context table.
- * @param idomain the provided domain.
- */
- private void getNextElligibleMessage(int idomain)
- {
- ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN();
- try
- {
- if (clDomCtxts[idomain].nonElligiblemsg != null)
- {
- TRACER.debugInfo("getNextElligibleMessage tests if the already " +
- " stored nonElligibleMsg has becoem elligible regarding " +
- " the crossDomainElligibleCN ("+crossDomainElligibleCN +
- " ) " +
- clDomCtxts[idomain].nonElligiblemsg.getChangeNumber().
- older(crossDomainElligibleCN));
- // we already got the oldest msg and it was not elligible
- if (clDomCtxts[idomain].nonElligiblemsg.getChangeNumber().
- older(crossDomainElligibleCN))
- {
- // it is now elligible
- clDomCtxts[idomain].nextMsg = clDomCtxts[idomain].nonElligiblemsg;
- clDomCtxts[idomain].nonElligiblemsg = null;
- }
- else
- {
- // the oldest is still not elligible - let's wait next
- }
- }
- else
- {
- // non blocking
- UpdateMsg newMsg = clDomCtxts[idomain].mh.getnextMessage(false);
- if (debugEnabled())
- TRACER.debugInfo(this +
- " getNextElligibleMessage got the next changelogmsg "
- + " from " + clDomCtxts[idomain].mh.getServiceId()
- + " newCLMsg=" + newMsg);
- clDomCtxts[idomain].nextMsg =
- clDomCtxts[idomain].nonElligiblemsg = null;
- // in non blocking mode, return null when no more msg
- if (newMsg != null)
- {
- /* TODO:ECL Take into account eligibility.
- TRACER.debugInfo("getNextElligibleMessage is "
- + newMsg.getChangeNumber()
- + new Date(newMsg.getChangeNumber().getTime()).toString()
- + " elligible "
- + newMsg.getChangeNumber().older(crossDomainElligibleCN));
- if (newMsg.getChangeNumber().older(crossDomainElligibleCN))
- {
- // is elligible
- clDomCtxts[idomain].nextMsg = newMsg;
- }
- else
- {
- // is not elligible
- clDomCtxts[idomain].nonElligiblemsg = newMsg;
- }
- */
- clDomCtxts[idomain].nextMsg = newMsg;
- }
- }
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
-
- /*
- */
- ECLEligibleCNComputerThread eligibleCNComputerThread = null;
- ChangeNumber liveecn;
- private ChangeNumber computeCrossDomainElligibleCN()
- {
- return liveecn;
- }
-
-
- /**
- * This class specifies the thread that computes periodically
- * the cross domain eligible CN.
- */
- private final class ECLEligibleCNComputerThread
- extends DirectoryThread
- {
- /**
- * The tracer object for the debug logger.
- */
- private boolean shutdown = false;
-
- private ECLEligibleCNComputerThread()
- {
- super("ECL eligible CN computer thread");
- start();
- }
-
- public void run()
- {
- while (shutdown == false)
- {
- try {
- synchronized (this)
- {
- liveecn = computeNewCrossDomainElligibleCN();
- try
- {
- this.wait(1000);
- } catch (InterruptedException e)
- { }
- }
- } catch (Exception end)
- {
- break;
- }
- }
- }
-
- private ChangeNumber computeNewCrossDomainElligibleCN()
- {
- ChangeNumber computedCrossDomainElligibleCN = null;
- String s = "=> ";
-
- ReplicationServer rs = replicationServerDomain.getReplicationServer();
-
- if (debugEnabled())
- TRACER.debugInfo("ECLSH.computeNewCrossDomainElligibleCN() "
- + " periodic starts rs="+rs);
-
- Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator();
- if (rsdi != null)
- {
- while (rsdi.hasNext())
- {
- ReplicationServerDomain domain = rsdi.next();
- if (domain.getBaseDn().equalsIgnoreCase("cn=changelog"))
- continue;
-
- ChangeNumber domainElligibleCN = computeEligibleCN(domain);
- if (domainElligibleCN==null)
- continue;
- if ((computedCrossDomainElligibleCN == null) ||
- (domainElligibleCN.older(computedCrossDomainElligibleCN)))
- {
- computedCrossDomainElligibleCN = domainElligibleCN;
- }
- s += "\n DN:" + domain.getBaseDn()
- + "\t\t domainElligibleCN :" + domainElligibleCN
- + "/" +
- new Date(domainElligibleCN.getTime()).toString();
- }
- }
-
- if (debugEnabled())
- TRACER.debugInfo("SH.computeNewCrossDomainElligibleCN() periodic " +
- " ends with " +
- " the following domainElligibleCN for each domain :" + s +
- "\n thus CrossDomainElligibleCN=" + computedCrossDomainElligibleCN +
- " ts=" +
- new Date(computedCrossDomainElligibleCN.getTime()).toString());
-
- return computedCrossDomainElligibleCN;
- }
- }
-
- /**
- * Compute the eligible CN.
- * @param rsd The provided replication server domain for which we want
- * to retrieve the eligible date.
- * @return null if the domain does not play in eligibility.
- */
- public ChangeNumber computeEligibleCN(ReplicationServerDomain rsd)
- {
- ChangeNumber elligibleCN = null;
- ServerState heartbeatState = rsd.getHeartbeatState();
- if (heartbeatState==null)
- return null;
-
- // compute elligible CN
- ServerState hbState = heartbeatState.duplicate();
-
- Iterator<Short> it = hbState.iterator();
- while (it.hasNext())
- {
- short sid = it.next();
- ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
-
- // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
- // then the server is considered down and not considered for eligibility
- if (TimeThread.getTime()-storedCN.getTime()>2000)
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "For RSD." + rsd.getBaseDn() + " Server " + sid
- + " is not considered for eligibility ... potentially down");
- continue;
- }
-
- if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
- {
- elligibleCN = storedCN;
- }
- }
-
- if (debugEnabled())
- TRACER.debugInfo(
- "For RSD." + rsd.getBaseDn() + " ElligibleCN()=" + elligibleCN);
- return elligibleCN;
- }
-
- /**
* Returns the client operation id.
* @return The client operation id.
*/
@@ -1533,7 +1415,7 @@
* @return Whether the current search is persistent or not.
*/
public short isPersistent() {
- return this.cLSearchCtxt.isPersistent;
+ return this.isPersistent;
}
/**
@@ -1541,7 +1423,137 @@
* @return Whether the current search is persistent or not.
*/
public int getSearchPhase() {
- return this.cLSearchCtxt.searchPhase;
+ return this.searchPhase;
}
+ /**
+ * Refresh the eligibleCN by requesting the replication server.
+ */
+ public void refreshEligibleCN()
+ {
+ eligibleCN = replicationServer.getEligibleCN();
+ }
+
+ /*
+ * Get first and last DraftCN
+ * @param crossDomainEligibleCN
+ * @return
+ */
+ private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN)
+ throws DirectoryException
+ {
+ /* The content of the DraftCNdb depends on the SEARCH operations done before
+ * requesting the DraftCN. If no operations, DraftCNdb is empty.
+ * The limits we want to get are the "potential" limits if a request was
+ * done, the DraftCNdb is probably not complete to do that.
+ *
+ * The first DraftCN is :
+ * - the first record from the DraftCNdb
+ * - if none because DraftCNdb empty,
+ * then
+ * if no change in replchangelog then return 0
+ * else return 1 (DraftCN that WILL be returned to next search)
+ *
+ * The last DraftCN is :
+ * - initialized with the last record from the DraftCNdb (0 if none)
+ * and consider the genState associated
+ * - to the last DraftCN, we add the count of updates in the replchangelog
+ * FROM that genState TO the crossDomainEligibleCN
+ * (this diff is done domain by domain)
+ */
+
+ int firstDraftCN;
+ int lastDraftCN;
+ boolean DraftCNdbIsEmpty;
+ DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler();
+
+ ReplicationServer rs = replicationServerDomain.getReplicationServer();
+
+ // Get the first DraftCN from the DraftCNdb
+ firstDraftCN = draftCNDbH.getFirstKey();
+ HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
+ if (firstDraftCN < 1)
+ {
+ DraftCNdbIsEmpty=true;
+ firstDraftCN = 0;
+ lastDraftCN = 0;
+ }
+ else
+ {
+ DraftCNdbIsEmpty=false;
+
+ // Get the last DraftCN from the DraftCNdb
+ lastDraftCN = draftCNDbH.getLastKey();
+
+ // Get the generalized state associated with the current last DraftCN
+ // and initializes from it the startStates table
+ String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
+ if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
+ {
+ domainsServerStateForLastSeqnum = MultiDomainServerState.
+ splitGenStateToServerStates(lastSeqnumGenState);
+ }
+ }
+
+ // Domain by domain
+ Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
+ if (rsdi != null)
+ {
+ while (rsdi.hasNext())
+ {
+ // process a domain
+ ReplicationServerDomain rsd = rsdi.next();
+
+ if (isServiceIDExcluded(rsd.getBaseDn()))
+ continue;
+
+ // for this domain, have the state in the replchangelog
+ // where the last DraftCN update is
+ ServerState domainServerStateForLastSeqnum;
+ if ((domainsServerStateForLastSeqnum == null) ||
+ (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
+ {
+ domainServerStateForLastSeqnum = new ServerState();
+ }
+ else
+ {
+ domainServerStateForLastSeqnum =
+ domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
+ }
+
+ // Count the number of (eligible) changes from this place
+ // to the eligible CN (cross server)
+ long ec = rsd.getEligibleCount(
+ domainServerStateForLastSeqnum, crossDomainEligibleCN);
+
+ // ... hum ...
+ if ((ec>0) && (DraftCNdbIsEmpty==false))
+ ec--;
+
+ // cumulates on domains
+ lastDraftCN += ec;
+
+ // DraftCN is empty and there are eligible updates in the repl changelog
+ // then init first DraftCN
+ if ((ec>0) && (firstDraftCN==0))
+ firstDraftCN = 1;
+ }
+ }
+ return new int[]{firstDraftCN, lastDraftCN};
+ }
+
+ private boolean isServiceIDExcluded(String serviceID)
+ {
+ // skip the excluded domains
+ boolean excluded = false;
+ for(String excludedServiceID : this.excludedServiceIDs)
+ {
+ if (excludedServiceID.equalsIgnoreCase(serviceID))
+ {
+ excluded=true;
+ break;
+ }
+ }
+ return excluded;
+ }
}
--
Gitblit v1.10.0