| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | |
| | | public class ECLServerHandler extends ServerHandler |
| | | { |
| | | |
| | | // Properties filled only if remote server is a RS |
| | | private String serverAddressURL; |
| | | |
| | | // This is a string identifying the operation, provided by the client part |
| | | // of the ECL, used to help interpretation of messages logged. |
| | | String operationId; |
| | | |
| | | // Iterator on the draftCN database. |
| | | private DraftCNDbIterator draftCNDbIter = null; |
| | | |
| | | boolean draftCompat = false; |
| | | /** |
| | | * CLDomainContext : contains the state properties for the search |
| | | * currently being processed, by replication domain. |
| | | * Specifies the last draft changer number (seqnum) requested. |
| | | */ |
| | | private class CLDomainContext |
| | | public int lastDraftCN = 0; |
| | | /** |
| | | * Specifies whether the draft change number (seqnum) db has been read until |
| | | * its end. |
| | | */ |
| | | public boolean isEndOfDraftCNReached = false; |
| | | /** |
| | | * Specifies whether the current search has been requested to be persistent |
| | | * or not. |
| | | */ |
| | | public short isPersistent; |
| | | /** |
| | | * Specifies the current search phase : INIT or PERSISTENT. |
| | | */ |
| | | public int searchPhase = INIT_PHASE; |
| | | /** |
| | | * Specifies the cookie contained in the request, specifying where |
| | | * to start serving the ECL. |
| | | */ |
| | | public String startCookie; |
| | | /** |
| | | * Specifies the value of the cookie before the change currently processed |
| | | * is returned. It is updated with the change number of the change |
| | | * currently processed (thus becoming the "current" cookie just |
| | | * before the change is returned. |
| | | */ |
| | | public MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(); |
| | | /** |
| | | * Specifies the excluded DNs (like cn=admin, ...). |
| | | */ |
| | | public ArrayList<String> excludedServiceIDs = new ArrayList<String>(); |
| | | |
| | | /** |
| | | * Eligible changeNumber - only changes older or equal to eligibleCN |
| | | * are published in the ECL. |
| | | */ |
| | | public ChangeNumber eligibleCN = null; |
| | | |
| | | /** |
| | | * Provides a string representation of this object. |
| | | * @return the string representation. |
| | | */ |
| | | public String dumpState() |
| | | { |
| | | ReplicationServerDomain rsd; // the repl server domain |
| | | boolean active; // is the domain still active |
| | | MessageHandler mh; // the message handler associated |
| | | UpdateMsg nextMsg; |
| | | UpdateMsg nonElligiblemsg; |
| | | return new String( |
| | | this.getClass().getCanonicalName() + |
| | | "[" + |
| | | "[draftCompat=" + draftCompat + |
| | | "] [persistent=" + isPersistent + |
| | | "] [lastDraftCN=" + lastDraftCN + |
| | | "] [isEndOfDraftCNReached=" + isEndOfDraftCNReached + |
| | | "] [searchPhase=" + searchPhase + |
| | | "] [startCookie=" + startCookie + |
| | | "] [previousCookie=" + previousCookie + |
| | | "]]"); |
| | | } |
| | | |
| | | /** |
| | | * Class that manages the 'by domain' state variables for the search being |
| | | * currently processed on the ECL. |
| | | * For example : |
| | | * if search on 'cn=changelog' is being processed when 2 replicated domains |
| | | * dc=us and dc=europe are configured, then there will be 2 DomainContext |
| | | * used, one for ds=us, and one for dc=europe. |
| | | */ |
| | | private class DomainContext |
| | | { |
| | | ReplicationServerDomain rsd; |
| | | |
| | | boolean active; // active when there are still changes |
| | | // supposed eligible for the ECL. |
| | | |
| | | MessageHandler mh; // the message handler from which are read |
| | | // the changes for this domain |
| | | private UpdateMsg nextMsg; |
| | | private UpdateMsg nextNonEligibleMsg; |
| | | ServerState startState; |
| | | ServerState currentState; |
| | | ServerState stopState; |
| | | |
| | | /** |
| | | * Add to the provider buffer a string representation of this object. |
| | | * {@inheritDoc} |
| | | */ |
| | | public void toString(StringBuilder buffer, int i) |
| | | @Override |
| | | public String toString() |
| | | { |
| | | CLDomainContext xx = clDomCtxts[i]; |
| | | StringBuilder buffer = new StringBuilder(); |
| | | toString(buffer); |
| | | return buffer.toString(); |
| | | } |
| | | /** |
| | | * Provide a string representation of this object for debug purpose.. |
| | | */ |
| | | public void toString(StringBuilder buffer) |
| | | { |
| | | buffer.append( |
| | | " clDomCtxts(" + i + ") [act=" + xx.active + |
| | | " rsd=" + rsd + |
| | | " nextMsg=" + nextMsg + "(" + |
| | | "[ [active=" + active + |
| | | "] [rsd=" + rsd + |
| | | "] [nextMsg=" + nextMsg + "(" + |
| | | (nextMsg != null? |
| | | new Date(nextMsg.getChangeNumber().getTime()).toString():"") |
| | | + ")" + |
| | | " nextNonEligibleMsg=" + nonElligiblemsg + |
| | | " startState=" + startState + |
| | | " stopState= " + stopState + |
| | | " currState= " + currentState + "]"); |
| | | "] [nextNonEligibleMsg=" + nextNonEligibleMsg + |
| | | "] [startState=" + startState + |
| | | "] [stopState= " + stopState + |
| | | "] [currentState= " + currentState + "]]"); |
| | | } |
| | | |
| | | /** |
| | | * Get the next message elligible regarding |
| | | * the crossDomain elligible CN. Put it in the context table. |
| | | * @param opid The operation id. |
| | | */ |
| | | private void getNextEligibleMessageForDomain(String opid) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "ctxt=" + toString()); |
| | | |
| | | assert(nextMsg == null); |
| | | try |
| | | { |
| | | // Before get a new message from the domain, evaluate in priority |
| | | // a message that has not been published to the ECL because it was |
| | | // not eligible |
| | | if (nextNonEligibleMsg != null) |
| | | { |
| | | boolean hasBecomeEligible = |
| | | (nextNonEligibleMsg.getChangeNumber().getTime() |
| | | <= eligibleCN.getTime()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + " stored nonEligibleMsg " + nextNonEligibleMsg |
| | | + " has now become eligible regarding " |
| | | + " the eligibleCN ("+ eligibleCN |
| | | + " ):" + hasBecomeEligible); |
| | | |
| | | if (hasBecomeEligible) |
| | | { |
| | | // it is now elligible |
| | | nextMsg = nextNonEligibleMsg; |
| | | nextNonEligibleMsg = null; |
| | | } |
| | | else |
| | | { |
| | | // the oldest is still not elligible - let's wait next |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // Here comes a new message !!! |
| | | // non blocking |
| | | UpdateMsg newMsg = mh.getnextMessage(false); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + " got new message : " |
| | | + " serviceId=[" + mh.getServiceId() |
| | | + "] [newMsg=" + newMsg + "]" + dumpState()); |
| | | |
| | | // in non blocking mode, return null when no more msg |
| | | if (newMsg != null) |
| | | { |
| | | boolean isEligible = (newMsg.getChangeNumber().getTime() |
| | | <= eligibleCN.getTime()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() |
| | | + " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "newMsg isEligible=" + isEligible + " since " |
| | | + "newMsg=[" + newMsg.getChangeNumber() |
| | | + " " + new Date(newMsg.getChangeNumber().getTime()).toString() |
| | | + "] eligibleCN=[" + eligibleCN |
| | | + " " + new Date(eligibleCN.getTime()).toString()+"]" |
| | | + dumpState()); |
| | | |
| | | if (isEligible) |
| | | { |
| | | nextMsg = newMsg; |
| | | } |
| | | else |
| | | { |
| | | nextNonEligibleMsg = newMsg; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // The list of contexts by domain for the current search |
| | | CLDomainContext[] clDomCtxts = new CLDomainContext[0]; |
| | | // The global list of contexts by domain for the search currently processed. |
| | | DomainContext[] domainCtxts = new DomainContext[0]; |
| | | |
| | | private void clDomCtxtsToString(String msg) |
| | | private String clDomCtxtsToString(String msg) |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | buffer.append(msg+"\n"); |
| | | for (int i=0;i<clDomCtxts.length;i++) |
| | | for (int i=0;i<domainCtxts.length;i++) |
| | | { |
| | | clDomCtxts[i].toString(buffer, i); |
| | | domainCtxts[i].toString(buffer); |
| | | buffer.append("\n"); |
| | | } |
| | | TRACER.debugInfo( |
| | | "In " + this.getName() + " clDomCtxts: " + buffer.toString()); |
| | | return buffer.toString(); |
| | | } |
| | | |
| | | /** |
| | | * Class that manages the state variables for the current search on the ECL. |
| | | */ |
| | | private class CLTraverseCtxt |
| | | { |
| | | /** |
| | | * Specifies the next changer number (seqnum), -1 when not. |
| | | */ |
| | | public int nextSeqnum; |
| | | /** |
| | | * Specifies whether the current search has been requested to be persistent |
| | | * or not. |
| | | */ |
| | | public short isPersistent; |
| | | /** |
| | | * Specifies the last changer number (seqnum) requested. |
| | | */ |
| | | public int stopSeqnum; |
| | | /** |
| | | * Specifies whether the change number (seqnum) db has been read until |
| | | * its end. |
| | | */ |
| | | public boolean endOfSeqnumdbReached = false; |
| | | /** |
| | | * Specifies the current search phase. |
| | | * 1 = init |
| | | * 2 = persistent |
| | | */ |
| | | public int searchPhase = 1; |
| | | /** |
| | | * Specifies the cookie contained in the request, specifying where |
| | | * to start serving the ECL. |
| | | */ |
| | | public String generalizedStartState; |
| | | /** |
| | | * Specifies the current cookie value. |
| | | */ |
| | | public MultiDomainServerState currentCookie = |
| | | new MultiDomainServerState(); |
| | | /** |
| | | * Specifies the excluded DNs. |
| | | */ |
| | | public ArrayList<String> excludedServiceIDs = new ArrayList<String>(); |
| | | |
| | | /** |
| | | * Provides a string representation of this object. |
| | | * @return the string representation. |
| | | */ |
| | | public String toString() |
| | | { |
| | | return new String( |
| | | this.getClass().getCanonicalName() + |
| | | ":[" + |
| | | " nextSeqnum=" + nextSeqnum + |
| | | " persistent=" + isPersistent + |
| | | " stopSeqnum" + stopSeqnum + |
| | | " endOfSeqnumdbReached=" + endOfSeqnumdbReached + |
| | | " searchPhase=" + searchPhase + |
| | | " generalizedStartState=" + generalizedStartState + |
| | | "]"); |
| | | } |
| | | |
| | | } |
| | | |
| | | // The context of the current search |
| | | private CLTraverseCtxt cLSearchCtxt = new CLTraverseCtxt(); |
| | | static int UNDEFINED_PHASE = 0; |
| | | static int INIT_PHASE = 1; |
| | | static int PERSISTENT_PHASE = 2; |
| | | |
| | | /** |
| | | * Starts this handler based on a start message received from remote server. |
| | |
| | | inECLStartMsg.getVersion()); |
| | | generationId = inECLStartMsg.getGenerationId(); |
| | | serverURL = inECLStartMsg.getServerURL(); |
| | | int separator = serverURL.lastIndexOf(':'); |
| | | serverAddressURL = |
| | | session.getRemoteAddress() + ":" + serverURL.substring(separator + |
| | | 1); |
| | | setInitialServerState(inECLStartMsg.getServerState()); |
| | | setSendWindowSize(inECLStartMsg.getWindowSize()); |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | |
| | | // Only V2 protocol has the group id in repl server start message |
| | | this.groupId = inECLStartMsg.getGroupId(); |
| | | } |
| | | // FIXME:ECL Any generationID must be removed, it makes no sense here. |
| | | oldGenerationId = -100; |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | replicationServer, rcvWindowSize); |
| | | try |
| | | { |
| | | setServiceIdAndDomain("cn=changelog"); |
| | | setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | StartECLSessionMsg startECLSessionMsg) |
| | | throws DirectoryException |
| | | { |
| | | // FIXME:ECL queueSize is hard coded to 1 else Handler hangs for some reason |
| | | // queueSize is hard coded to 1 else super class hangs for some reason |
| | | super(null, 1, replicationServerURL, replicationServerId, |
| | | replicationServer, 0); |
| | | try |
| | | { |
| | | setServiceIdAndDomain("cn=changelog"); |
| | | setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | |
| | | /** |
| | | * Initialize the handler from a provided cookie value. |
| | | * @param providedGeneralizedStartState The provided cookie value. |
| | | * @param crossDomainStartState The provided cookie value. |
| | | * @throws DirectoryException When an error is raised. |
| | | */ |
| | | public void initializeCLSearchFromGenState( |
| | | String providedGeneralizedStartState) |
| | | public void initializeCLSearchFromGenState(String crossDomainStartState) |
| | | throws DirectoryException |
| | | { |
| | | this.cLSearchCtxt.nextSeqnum = -1; // will not generate seqnum |
| | | initializeCLDomCtxts(providedGeneralizedStartState); |
| | | initializeCLDomCtxts(crossDomainStartState); |
| | | } |
| | | |
| | | /** |
| | | * Initialize the handler from a provided draft first change number. |
| | | * @param startDraftCN The provided draft first change number. |
| | | * @throws DirectoryException When an error is raised. |
| | | */ |
| | | public void initializeCLSearchFromDraftCN(int startDraftCN) |
| | | throws DirectoryException |
| | | { |
| | | String crossDomainStartState; |
| | | |
| | | draftCompat = true; |
| | | |
| | | DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler(); |
| | | if (startDraftCN < 0) |
| | | { |
| | | // Request filter does not contain any firstDraftCN |
| | | // So we'll generate from the beginning of what we have stored here. |
| | | |
| | | // Get the first DraftCN from DraftCNdb |
| | | if (draftCNDb.count() == 0) |
| | | { |
| | | // db is empty |
| | | isEndOfDraftCNReached = true; |
| | | crossDomainStartState = null; |
| | | } |
| | | else |
| | | { |
| | | // get the generalizedServerState related to the start of the draftDb |
| | | crossDomainStartState = draftCNDb.getValue(draftCNDb.getFirstKey()); |
| | | |
| | | // Get an iterator to traverse the draftCNDb |
| | | try |
| | | { |
| | | draftCNDbIter = |
| | | draftCNDb.generateIterator(draftCNDb.getFirstKey()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | |
| | | if (draftCNDbIter != null) |
| | | draftCNDbIter.releaseCursor(); |
| | | |
| | | throw new DirectoryException( |
| | | ResultCode.OPERATIONS_ERROR, |
| | | Message.raw(Category.SYNC, |
| | | Severity.FATAL_ERROR,"Server Error.")); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // Request filter does contain a startDraftCN |
| | | |
| | | // Read the draftCNDb to see whether it contains startDraftCN |
| | | crossDomainStartState = draftCNDb.getValue(startDraftCN); |
| | | |
| | | if (crossDomainStartState != null) |
| | | { |
| | | // startDraftCN is present in the draftCnDb |
| | | // Get an iterator to traverse the draftCNDb |
| | | try |
| | | { |
| | | draftCNDbIter = |
| | | draftCNDb.generateIterator(draftCNDb.getFirstKey()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | |
| | | if (draftCNDbIter != null) |
| | | draftCNDbIter.releaseCursor(); |
| | | |
| | | throw new DirectoryException( |
| | | ResultCode.OPERATIONS_ERROR, |
| | | Message.raw(Category.SYNC, |
| | | Severity.FATAL_ERROR,"Server Error.")); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // startDraftCN provided in the request is not present in the draftCnDb |
| | | // Is the provided startDraftCN <= the potential last DraftCNdb |
| | | |
| | | // Get the draftLimits (from the eligibleCN got at the beginning of |
| | | // the operation. |
| | | int[] limits = getECLDraftCNLimits(eligibleCN); |
| | | |
| | | if (startDraftCN<=limits[1]) |
| | | { |
| | | // startDraftCN is between first and last and has never been |
| | | // returned yet |
| | | crossDomainStartState = draftCNDb.getValue(draftCNDb.getLastKey()); |
| | | // FIXME:ECL ... ok we'll start from the end of the draftCNDb BUT ... |
| | | // this is NOT the request of the client !!!! |
| | | } |
| | | else |
| | | { |
| | | throw new DirectoryException( |
| | | ResultCode.SUCCESS, |
| | | Message.raw(Category.SYNC, |
| | | Severity.INFORMATION,"Bad value provided for change number " |
| | | + " Failed to match a replication state to "+startDraftCN)); |
| | | } |
| | | } |
| | | } |
| | | this.draftCompat = true; |
| | | |
| | | initializeCLDomCtxts(crossDomainStartState); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Initialize the context for each domain. |
| | | * @param providedGeneralizedStartState the provided generalized state |
| | | * @param providedCookie the provided generalized state |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeCLDomCtxts(String providedGeneralizedStartState) |
| | | public void initializeCLDomCtxts(String providedCookie) |
| | | throws DirectoryException |
| | | { |
| | | HashMap<String,ServerState> startStates = new HashMap<String,ServerState>(); |
| | | |
| | | ReplicationServer rs = replicationServerDomain.getReplicationServer(); |
| | | |
| | | try |
| | | { |
| | | // Initialize start state for all running domains with empty state |
| | | Iterator<ReplicationServerDomain> rsdk = rs.getCacheIterator(); |
| | | if (rsdk != null) |
| | | { |
| | | while (rsdk.hasNext()) |
| | | { |
| | | // process a domain |
| | | ReplicationServerDomain rsd = rsdk.next(); |
| | | // skip the changelog domain |
| | | if (rsd == this.replicationServerDomain) |
| | | continue; |
| | | startStates.put(rsd.getBaseDn(), new ServerState()); |
| | | } |
| | | } |
| | | |
| | | // Overwrite start state from the cookie provided in the request |
| | | if ((providedGeneralizedStartState != null) && |
| | | (providedGeneralizedStartState.length()>0)) |
| | | { |
| | | String[] domains = providedGeneralizedStartState.split(";"); |
| | | for (String domainState : domains) |
| | | { |
| | | // Split baseDN and serverState |
| | | String[] fields = domainState.split(":"); |
| | | |
| | | // BaseDN - Check it |
| | | String domainBaseDNReceived = fields[0]; |
| | | if (!startStates.containsKey(domainBaseDNReceived)) |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( |
| | | "unknown " + domainBaseDNReceived)); |
| | | |
| | | // ServerState |
| | | ServerState domainServerState = new ServerState(); |
| | | if (fields.length>1) |
| | | { |
| | | String strState = fields[1]; |
| | | String[] strCN = strState.split(" "); |
| | | for (String sr : strCN) |
| | | { |
| | | ChangeNumber fromChangeNumber = new ChangeNumber(sr); |
| | | domainServerState.update(fromChangeNumber); |
| | | } |
| | | } |
| | | startStates.put(domainBaseDNReceived, domainServerState); |
| | | |
| | | // FIXME: ECL first cookie value check |
| | | // ECL For each of the provided state, it this state is older |
| | | // than the older change stored in the replication changelog .... |
| | | // then a purge occured since the time the cookie was published |
| | | // it is recommended to do a full resync |
| | | ReplicationServerDomain rsd = |
| | | rs.getReplicationServerDomain(domainBaseDNReceived, false); |
| | | ServerState domainStartState = rsd.getStartState(); |
| | | if (!domainServerState.cover(domainStartState)) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( |
| | | "too old cookie provided " + providedGeneralizedStartState |
| | | + " first acceptable change for " + rsd.getBaseDn() |
| | | + " is " + rsd.getStartState())); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | throw de; |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( |
| | | "Exception raised: " + e.getMessage())); |
| | | } |
| | | // Parse the provided cookie and overwrite startState from it. |
| | | if ((providedCookie != null) && (providedCookie.length()!=0)) |
| | | startStates = |
| | | MultiDomainServerState.splitGenStateToServerStates(providedCookie); |
| | | |
| | | try |
| | | { |
| | | // Now traverse all domains and build the initial changelog context |
| | | Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator(); |
| | | // Now traverse all domains and build all the initial contexts : |
| | | // - the global one : dumpState() |
| | | // - the domain by domain ones : domainCtxts |
| | | Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator(); |
| | | |
| | | // Creates the table that will contain the real-time info by domain. |
| | | clDomCtxts = new CLDomainContext[rs.getCacheSize()-1 |
| | | -this.cLSearchCtxt.excludedServiceIDs.size()]; |
| | | HashSet<DomainContext> tmpSet = new HashSet<DomainContext>(); |
| | | int i =0; |
| | | if (rsdi != null) |
| | | { |
| | |
| | | continue; |
| | | |
| | | // skip the excluded domains |
| | | boolean excluded = false; |
| | | for(String excludedServiceID : this.cLSearchCtxt.excludedServiceIDs) |
| | | { |
| | | if (excludedServiceID.equalsIgnoreCase(rsd.getBaseDn())) |
| | | { |
| | | excluded=true; |
| | | break; |
| | | } |
| | | } |
| | | if (excluded) |
| | | if (isServiceIDExcluded(rsd.getBaseDn())) |
| | | continue; |
| | | |
| | | // Creates the context record |
| | | CLDomainContext newContext = new CLDomainContext(); |
| | | newContext.active = true; |
| | | newContext.rsd = rsd; |
| | | // Creates the new domain context |
| | | DomainContext newDomainCtxt = new DomainContext(); |
| | | newDomainCtxt.active = true; |
| | | newDomainCtxt.rsd = rsd; |
| | | |
| | | if (this.cLSearchCtxt.isPersistent == |
| | | // Assign the start state for the domain |
| | | if (isPersistent == |
| | | StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newContext.startState = rsd.getCLElligibleState(); |
| | | newDomainCtxt.startState = rsd.getEligibleState(eligibleCN); |
| | | } |
| | | else |
| | | { |
| | | newContext.startState = startStates.get(rsd.getBaseDn()); |
| | | newContext.stopState = rsd.getCLElligibleState(); |
| | | newDomainCtxt.startState = startStates.remove(rsd.getBaseDn()); |
| | | if ((providedCookie==null)||(providedCookie.isEmpty())) |
| | | newDomainCtxt.startState = new ServerState(); |
| | | else |
| | | if (newDomainCtxt.startState == null) |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( |
| | | "missing " + rsd.getBaseDn())); |
| | | newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN); |
| | | } |
| | | newContext.currentState = new ServerState(); |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | | // Creates an unconnected SH |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, |
| | | replicationServerURL, replicationServerId, replicationServer); |
| | | // set initial state |
| | | mh.setInitialServerState(newContext.startState); |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | // set serviceID and domain |
| | | mh.setServiceIdAndDomain(rsd.getBaseDn()); |
| | | // register into domain |
| | | // register the unconnected into the domain |
| | | rsd.registerHandler(mh); |
| | | newContext.mh = mh; |
| | | newDomainCtxt.mh = mh; |
| | | |
| | | previousCookie.update( |
| | | newDomainCtxt.rsd.getBaseDn(), |
| | | newDomainCtxt.startState); |
| | | |
| | | // store the new context |
| | | clDomCtxts[i] = newContext; |
| | | tmpSet.add(newDomainCtxt); |
| | | i++; |
| | | } |
| | | } |
| | | |
| | | // the next record from the seqnumdb should be the one |
| | | cLSearchCtxt.endOfSeqnumdbReached = false; |
| | | cLSearchCtxt.generalizedStartState = providedGeneralizedStartState; |
| | | |
| | | // Initializes all domain with the next elligible message |
| | | for (int j=0; j<clDomCtxts.length; j++) |
| | | if (!startStates.isEmpty()) |
| | | { |
| | | this.getNextElligibleMessage(j); |
| | | if (clDomCtxts[j].nextMsg == null) |
| | | clDomCtxts[j].active = false; |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( |
| | | "unknown " + startStates.toString())); |
| | | } |
| | | domainCtxts = tmpSet.toArray(new DomainContext[0]); |
| | | |
| | | // the next record from the DraftCNdb should be the one |
| | | startCookie = providedCookie; |
| | | |
| | | // Initializes all domain with the next(first) elligible message |
| | | for (int j=0; j<domainCtxts.length; j++) |
| | | { |
| | | domainCtxts[j].getNextEligibleMessageForDomain(operationId); |
| | | |
| | | if (domainCtxts[j].nextMsg == null) |
| | | domainCtxts[j].active = false; |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | // FIXME:ECL do not publish internal exception plumb to the client |
| | | throw new DirectoryException( |
| | | ResultCode.OPERATIONS_ERROR, |
| | | Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " + |
| | | e.getLocalizedMessage()), |
| | | e); |
| | | e), |
| | | e); |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | " initializeCLDomCtxts ends with " + " " + dumpState()); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this handler ServerHandler. |
| | | * Shutdown this handler. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | for (int i=0;i<clDomCtxts.length;i++) |
| | | for (int i=0;i<domainCtxts.length;i++) |
| | | { |
| | | if (!clDomCtxts[i].rsd.unRegisterHandler(clDomCtxts[i].mh)) |
| | | if (!domainCtxts[i].rsd.unRegisterHandler(domainCtxts[i].mh)) |
| | | { |
| | | TRACER.debugInfo(this +" shutdown() Internal error " + |
| | | " when unregistering "+ clDomCtxts[i].mh); |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | this +" shutdown() - error when unregistering handler " |
| | | + domainCtxts[i].mh)); |
| | | } |
| | | clDomCtxts[i].rsd.stopServer(clDomCtxts[i].mh); |
| | | domainCtxts[i].rsd.stopServer(domainCtxts[i].mh); |
| | | } |
| | | super.shutdown(); |
| | | clDomCtxts = null; |
| | | domainCtxts = null; |
| | | } |
| | | |
| | | /** |
| | |
| | | attributes.add(Attributes.create("External-Changelog-Server", |
| | | serverURL)); |
| | | |
| | | // FIXME:ECL No monitoring exist for ECL. |
| | | // TODO:ECL No monitoring exist for ECL. |
| | | return attributes; |
| | | } |
| | | /** |
| | |
| | | { |
| | | String localString; |
| | | localString = "External changelog Server "; |
| | | if (this.cLSearchCtxt==null) |
| | | localString += serverId + " " + serverURL + " " + getServiceId(); |
| | | if (this.serverId != 0) |
| | | localString += serverId + " " + serverURL + " " + getServiceId() |
| | | + " " + this.getOperationId(); |
| | | else |
| | | localString += this.getName(); |
| | | return localString; |
| | |
| | | */ |
| | | public ServerStatus getStatus() |
| | | { |
| | | // FIXME:ECL Sould ECLServerHandler manage a ServerStatus ? |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | /** |
| | | * Retrieves the Address URL for this server handler. |
| | | * |
| | | * @return The Address URL for this server handler, |
| | | * in the form of an IP address and port separated by a colon. |
| | | */ |
| | | public String getServerAddressURL() |
| | | { |
| | | return serverAddressURL; |
| | | // There is no other status possible for the ECL Server Handler to |
| | | // be normally connected. |
| | | return ServerStatus.NORMAL_STATUS; |
| | | } |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | { |
| | | |
| | | // |
| | | this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ? |
| | | this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ? |
| | | this.setConsumerActive(true); |
| | | this.cLSearchCtxt.searchPhase = 1; |
| | | //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ? |
| | | //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ? |
| | | //this.setConsumerActive(true); |
| | | |
| | | this.operationId = startECLSessionMsg.getOperationId(); |
| | | this.setName(this.getClass().getCanonicalName()+ " " + operationId); |
| | | |
| | | if (eligibleCNComputerThread==null) |
| | | eligibleCNComputerThread = new ECLEligibleCNComputerThread(); |
| | | |
| | | cLSearchCtxt.isPersistent = startECLSessionMsg.isPersistent(); |
| | | cLSearchCtxt.stopSeqnum = startECLSessionMsg.getLastDraftChangeNumber(); |
| | | cLSearchCtxt.searchPhase = 1; |
| | | cLSearchCtxt.currentCookie = new MultiDomainServerState( |
| | | isPersistent = startECLSessionMsg.isPersistent(); |
| | | lastDraftCN = startECLSessionMsg.getLastDraftChangeNumber(); |
| | | searchPhase = INIT_PHASE; |
| | | previousCookie = new MultiDomainServerState( |
| | | startECLSessionMsg.getCrossDomainServerState()); |
| | | cLSearchCtxt.excludedServiceIDs=startECLSessionMsg.getExcludedServiceIDs(); |
| | | excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs(); |
| | | replicationServer.disableEligibility(excludedServiceIDs); |
| | | eligibleCN = replicationServer.getEligibleCN(); |
| | | |
| | | //-- |
| | | if (startECLSessionMsg.getECLRequestType()==0) |
| | | if (startECLSessionMsg.getECLRequestType()== |
| | | StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE) |
| | | { |
| | | initializeCLSearchFromGenState( |
| | | startECLSessionMsg.getCrossDomainServerState()); |
| | | } |
| | | else if (startECLSessionMsg.getECLRequestType()== |
| | | StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER) |
| | | { |
| | | initializeCLSearchFromDraftCN( |
| | | startECLSessionMsg.getFirstDraftChangeNumber()); |
| | | } |
| | | |
| | | if (session != null) |
| | | { |
| | |
| | | // Resume the writer |
| | | ((ECLServerWriter)writer).resumeWriter(); |
| | | |
| | | // FIXME:ECL Potential race condition if writer not yet resumed here |
| | | // TODO:ECL Potential race condition if writer not yet resumed here |
| | | } |
| | | |
| | | if (cLSearchCtxt.isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) |
| | | if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | closePhase1(); |
| | | closeInitPhase(); |
| | | } |
| | | |
| | | /* TODO: Good Draft Compat |
| | | //-- |
| | | if (startCLMsg.getStartMode()==1) |
| | | { |
| | | initializeCLSearchFromProvidedSeqnum(startCLMsg.getSequenceNumber()); |
| | | } |
| | | |
| | | /* TODO: From replication changenumber |
| | | //-- |
| | | if (startCLMsg.getStartMode()==2) |
| | | { |
| | |
| | | { |
| | | // to get the CL first and last |
| | | initializeCLDomCtxts(null); // from start |
| | | ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN(); |
| | | ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN(); |
| | | |
| | | try |
| | | { |
| | | // to get the CL first and last |
| | | // last rely on the crossDomainElligibleCN thhus must have been |
| | | // last rely on the crossDomainEligibleCN thhus must have been |
| | | // computed before |
| | | int[] limits = computeCLLimits(crossDomainElligibleCN); |
| | | int[] limits = computeCLLimits(crossDomainEligibleCN); |
| | | // Send the response |
| | | CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]); |
| | | session.publish(msg); |
| | |
| | | } |
| | | return; |
| | | } |
| | | Good Draft Compat */ |
| | | */ |
| | | |
| | | // Store into domain |
| | | registerIntoDomain(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | this.getName() + " initialized: " + |
| | | " " + dumpState() + " " + |
| | | " " + clDomCtxtsToString("")); |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | throws DirectoryException |
| | | { |
| | | boolean interrupted = true; |
| | | ECLUpdateMsg msg = getnextUpdate(); |
| | | ECLUpdateMsg msg = getNextECLUpdate(); |
| | | |
| | | // FIXME:ECL We should refactor so that a SH always have a session |
| | | // TODO:ECL We should refactor so that a SH always have a session |
| | | if (session == null) |
| | | return msg; |
| | | |
| | | /* |
| | | * When we remove a message from the queue we need to check if another |
| | | * server is waiting in flow control because this queue was too long. |
| | | * This check might cause a performance penalty an therefore it |
| | | * is not done for every message removed but only every few messages. |
| | | */ |
| | | /** FIXME:ECL checkAllSaturation makes no sense for ECLServerHandler ? |
| | | if (++saturationCount > 10) |
| | | { |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | replicationServerDomain.checkAllSaturation(); |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | } |
| | | */ |
| | | boolean acquired = false; |
| | | do |
| | | { |
| | |
| | | |
| | | /** |
| | | * Get the next message - non blocking - null when none. |
| | | * |
| | | * @param synchronous - not used - always non blocking. |
| | | * @return the next message - null when none. |
| | | * This method is currently not used but we don't want to keep the mother |
| | | * class method since it make no sense for ECLServerHandler. |
| | | * @param synchronous - not used |
| | | * @return the next message |
| | | */ |
| | | protected UpdateMsg getnextMessage(boolean synchronous) |
| | | { |
| | | UpdateMsg msg = null; |
| | | try |
| | | { |
| | | ECLUpdateMsg eclMsg = getnextUpdate(); |
| | | ECLUpdateMsg eclMsg = getNextECLUpdate(); |
| | | if (eclMsg!=null) |
| | | msg = eclMsg.getUpdateMsg(); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the next external changelog update. |
| | | * |
| | | * @return The ECL update, null when none. |
| | | * @exception DirectoryException when any problem occurs. |
| | | * Returns the next update message for the External Changelog (ECL). |
| | | * @return the ECL update message, null when there aren't anymore. |
| | | * @throws DirectoryException when an error occurs. |
| | | */ |
| | | protected ECLUpdateMsg getnextUpdate() |
| | | public ECLUpdateMsg getNextECLUpdate() |
| | | throws DirectoryException |
| | | { |
| | | return getGeneralizedNextECLUpdate(this.cLSearchCtxt); |
| | | } |
| | | ECLUpdateMsg oldestChange = null; |
| | | |
| | | /** |
| | | * Computes the cross domain eligible message (non blocking). |
| | | * Return null when search is covered |
| | | */ |
| | | private ECLUpdateMsg getGeneralizedNextECLUpdate(CLTraverseCtxt cLSearchCtxt) |
| | | throws DirectoryException |
| | | { |
| | | ECLUpdateMsg theOldestChange = null; |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + "," + this + |
| | | " getGeneralizedNextECLUpdate starts with ctxt=" |
| | | + cLSearchCtxt); |
| | | " getNextECLUpdate starts: " + dumpState()); |
| | | |
| | | if ((cLSearchCtxt.nextSeqnum != -1) && |
| | | (!cLSearchCtxt.endOfSeqnumdbReached)) |
| | | { |
| | | /* TODO:ECL G Good changelog draft compat. |
| | | // First time , initialise the cursor to traverse the seqnumdb |
| | | if (seqnumDbReadIterator == null) |
| | | { |
| | | try |
| | | { |
| | | seqnumDbReadIterator = replicationServerDomain.getReplicationServer(). |
| | | getSeqnumDbHandler().generateIterator(cLSearchCtxt.nextSeqnum); |
| | | TRACER.debugInfo("getGeneralizedNextMessage(): " |
| | | + " creates seqnumDbReadIterator from nextSeqnum=" |
| | | + cLSearchCtxt.nextSeqnum |
| | | + " 1rst=" + seqnumDbReadIterator.getSeqnum() |
| | | + " CN=" + seqnumDbReadIterator.getChangeNumber() |
| | | + cLSearchCtxt); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | cLSearchCtxt.endOfSeqnumdbReached = true; |
| | | } |
| | | } |
| | | */ |
| | | } |
| | | try |
| | | { |
| | | |
| | | |
| | | // Search / no seqnum / not persistent |
| | | // Search / no DraftCN / not persistent |
| | | // ----------------------------------- |
| | | // init: all domain are candidate |
| | | // get one msg from each |
| | |
| | | // if one domain has no msg, still is candidate |
| | | |
| | | int iDom = 0; |
| | | boolean nextclchange = true; |
| | | while ((nextclchange) && (cLSearchCtxt.searchPhase==1)) |
| | | boolean continueLooping = true; |
| | | while ((continueLooping) && (searchPhase == INIT_PHASE)) |
| | | { |
| | | |
| | | // Step 1 & 2 |
| | | if (cLSearchCtxt.searchPhase==1) |
| | | if (searchPhase == INIT_PHASE) |
| | | { |
| | | if (debugEnabled()) |
| | | clDomCtxtsToString("In getGeneralizedNextMessage : " + |
| | | "looking for the generalized oldest change"); |
| | | // Normally we whould not loop .. except ... |
| | | continueLooping = false; |
| | | |
| | | // Retrieves the index in the subx table of the |
| | | // generalizedOldestChange |
| | | iDom = getGeneralizedOldestChange(); |
| | | iDom = getOldestChangeFromDomainCtxts(); |
| | | |
| | | // idomain != -1 means that we got one |
| | | // generalized oldest change to process |
| | | if (iDom==-1) |
| | | // iDom == -1 means that there is no oldest change to process |
| | | if (iDom == -1) |
| | | { |
| | | closePhase1(); |
| | | closeInitPhase(); |
| | | |
| | | // signify end of phase 1 to the caller |
| | | // signals end of phase 1 to the caller |
| | | return null; |
| | | } |
| | | |
| | | // idomain != -1 means that we got one |
| | | // generalized oldest change to process |
| | | String suffix = this.clDomCtxts[iDom].rsd.getBaseDn(); |
| | | theOldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg, |
| | | null, // set later |
| | | suffix); |
| | | // Build the ECLUpdateMsg to be returned |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)domainCtxts[iDom].nextMsg, |
| | | null, // cookie will be set later |
| | | domainCtxts[iDom].rsd.getBaseDn(), |
| | | 0); // draftChangeNumber may be set later |
| | | domainCtxts[iDom].nextMsg = null; |
| | | |
| | | nextclchange = false; |
| | | |
| | | /* TODO:ECL G Good change log draft compat. |
| | | if (cLSearchCtxt.nextSeqnum!=-1) |
| | | if (draftCompat) |
| | | { |
| | | // Should either retrieve or generate a seqnum |
| | | // we also need to check if the seqnumdb is acccurate reagrding |
| | | // either retrieve a draftCN from the draftCNDb |
| | | // or assign a new draftCN and store in the db |
| | | |
| | | DraftCNDbHandler draftCNDb=replicationServer.getDraftCNDbHandler(); |
| | | |
| | | // We also need to check if the draftCNdb is consistent with |
| | | // the changelogdb. |
| | | // if not, 2 potential reasons |
| | | // -1 : purge from the changelog .. let's traverse the seqnumdb |
| | | // -2 : changelog is late .. let's traverse the changelog |
| | | // a/ : changelog has been purged (trim)let's traverse the draftCNDb |
| | | // b/ : changelog is late .. let's traverse the changelogDb |
| | | // The following loop allows to loop until being on the same cn |
| | | // in the 2 dbs |
| | | |
| | | // replogcn : the oldest change from the changelog db |
| | | ChangeNumber replogcn = theOldestChange.getChangeNumber(); |
| | | DN replogReplDomDN = clDomCtxts[iDom].rsd.getBaseDn(); |
| | | ChangeNumber cnFromChangelogDb = |
| | | oldestChange.getUpdateMsg().getChangeNumber(); |
| | | String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn(); |
| | | |
| | | while (true) |
| | | { |
| | | if (!cLSearchCtxt.endOfSeqnumdbReached) |
| | | if (!isEndOfDraftCNReached) |
| | | { |
| | | // we did not reach yet the end of the seqnumdb |
| | | // we did not reach yet the end of the DraftCNdb |
| | | |
| | | // seqnumcn : the next change from from the seqnum db |
| | | ChangeNumber seqnumcn = seqnumDbReadIterator.getChangeNumber(); |
| | | // the next change from the DraftCN db |
| | | ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber(); |
| | | String dnFromDraftCNDb = draftCNDbIter.getServiceID(); |
| | | |
| | | // are replogcn and seqnumcn should be the same change ? |
| | | int cmp = replogcn.compareTo(seqnumcn); |
| | | DN seqnumReplDomDN=DN.decode(seqnumDbReadIterator. |
| | | getDomainDN()); |
| | | // are replogcn and DraftCNcn should be the same change ? |
| | | int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb); |
| | | int areDNEqual = dnFromChangelogDb.compareTo(dnFromDraftCNDb); |
| | | |
| | | TRACER.debugInfo("seqnumgen: comparing the 2 db " |
| | | + " changelogdb:" + replogReplDomDN + "=" + replogcn |
| | | + " ts=" + |
| | | new Date(replogcn.getTime()).toString() |
| | | + "## seqnumdb:" + seqnumReplDomDN + "=" + seqnumcn |
| | | + " ts=" + |
| | | new Date(seqnumcn.getTime()).toString() |
| | | + " sn older=" + seqnumcn.older(replogcn)); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " comparing the 2 db DNs :" |
| | | + dnFromChangelogDb + "?=" + cnFromChangelogDb |
| | | + " timestamps:" + new Date(cnFromChangelogDb.getTime()) |
| | | + " ?older" + new Date(cnFromDraftCNDb.getTime())); |
| | | |
| | | if ((replogReplDomDN.compareTo(seqnumReplDomDN)==0) && (cmp==0)) |
| | | if ((areDNEqual==0) && (areCNEqual==0)) |
| | | { |
| | | // same domain and same CN => same change |
| | | |
| | | // assign the seqnum from the seqnumdb |
| | | // to the change from the changelogdb |
| | | // assign the DraftCN found to the change from the changelogdb |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " assigning draftCN=" + draftCNDbIter.getDraftCN() |
| | | + " to change=" + oldestChange); |
| | | |
| | | TRACER.debugInfo("seqnumgen: assigning seqnum=" |
| | | + seqnumDbReadIterator.getSeqnum() |
| | | + " to change=" + theOldestChange); |
| | | theOldestChange.setSeqnum(seqnumDbReadIterator.getSeqnum()); |
| | | oldestChange.setDraftChangeNumber( |
| | | draftCNDbIter.getDraftCN()); |
| | | |
| | | // prepare the next seqnum for the potential next change added |
| | | // to the seqnumDb |
| | | cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum() |
| | | + 1; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // replogcn and seqnumcn are NOT the same change |
| | | if (seqnumcn.older(replogcn)) |
| | | // replogcn and DraftCNcn are NOT on the same change |
| | | if (cnFromDraftCNDb.older(cnFromChangelogDb)) |
| | | { |
| | | // the change from the seqnumDb is older |
| | | // the change from the DraftCNDb is older |
| | | // that means that the change has been purged from the |
| | | // changelog |
| | | // changelogDb (and DraftCNdb not yet been trimed) |
| | | |
| | | try |
| | | { |
| | | // let's traverse the seqnumdb searching for the change |
| | | // let's traverse the DraftCNdb searching for the change |
| | | // found in the changelogDb. |
| | | TRACER.debugInfo("seqnumgen: will skip " |
| | | + seqnumcn + " and next from the seqnum"); |
| | | cLSearchCtxt.endOfSeqnumdbReached = |
| | | (seqnumDbReadIterator.next()==false); |
| | | TRACER.debugInfo("seqnumgen: has nexted cr to " |
| | | + " sn=" + seqnumDbReadIterator.getSeqnum() |
| | | + " cn=" + seqnumDbReadIterator.getChangeNumber() |
| | | + " and reached end " |
| | | + " of seqnumdb:"+cLSearchCtxt.endOfSeqnumdbReached); |
| | | if (cLSearchCtxt.endOfSeqnumdbReached) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " will skip " + cnFromDraftCNDb |
| | | + " and read next change from the DraftCNDb."); |
| | | |
| | | isEndOfDraftCNReached = (draftCNDbIter.next()==false); |
| | | |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " has skiped to " |
| | | + " sn=" + draftCNDbIter.getDraftCN() |
| | | + " cn=" + draftCNDbIter.getChangeNumber() |
| | | + " End of draftCNDb ?"+isEndOfDraftCNReached); |
| | | |
| | | if (isEndOfDraftCNReached) |
| | | { |
| | | // we are at the end of the seqnumdb in the append mode |
| | | // store in seqnumdb the pair |
| | | // seqnum of the cur change,state before this change) |
| | | replicationServerDomain.addSeqnum( |
| | | cLSearchCtxt.nextSeqnum, |
| | | getGenState(), |
| | | clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), |
| | | theOldestChange.getChangeNumber()); |
| | | theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); |
| | | cLSearchCtxt.nextSeqnum++; |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | |
| | | // generate a new draftCN and assign to this change |
| | | oldestChange.setDraftChangeNumber( |
| | | replicationServer.getNewDraftCN()); |
| | | |
| | | // store in DraftCNdb the pair |
| | | // (draftCN_of_the_cur_change, state_before_this_change) |
| | | draftCNDb.add( |
| | | oldestChange.getDraftChangeNumber(), |
| | | previousCookie.toString(), |
| | | oldestChange.getServiceId(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // next change from seqnumdb |
| | | cLSearchCtxt.nextSeqnum = |
| | | seqnumDbReadIterator.getSeqnum() + 1; |
| | | // let's go to test this new change fro the DraftCNdb |
| | | continue; |
| | | } |
| | | } |
| | |
| | | // the change from the changelogDb is older |
| | | // it should have been stored lately |
| | | // let's continue to traverse the changelogdb |
| | | TRACER.debugInfo("seqnumgen: will skip " |
| | | + replogcn + " and next from the CL"); |
| | | nextclchange = true; |
| | | TRACER.debugInfo("getNextECLUpdate: will skip " |
| | | + cnFromChangelogDb |
| | | + " and read next from the regular changelog."); |
| | | continueLooping = true; |
| | | break; // TO BE CHECKED |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // we are at the end of the seqnumdb in the append mode |
| | | // store in seqnumdb the pair |
| | | // (seqnum of the current change, state before this change) |
| | | replicationServerDomain.addSeqnum( |
| | | cLSearchCtxt.nextSeqnum, |
| | | getGenState(), |
| | | clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), |
| | | theOldestChange.getChangeNumber()); |
| | | theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); |
| | | cLSearchCtxt.nextSeqnum++; |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | // store in DraftCNdb the pair |
| | | // (DraftCN of the current change, state before this change) |
| | | oldestChange.setDraftChangeNumber( |
| | | replicationServer.getNewDraftCN()); |
| | | |
| | | draftCNDb.add( |
| | | oldestChange.getDraftChangeNumber(), |
| | | this.previousCookie.toString(), |
| | | domainCtxts[iDom].rsd.getBaseDn(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | break; |
| | | } |
| | | } // while seqnum |
| | | } // nextseqnum !- -1 |
| | | */ |
| | | } // while DraftCN |
| | | } |
| | | |
| | | // here we have the right oldest change and in the seqnum case we |
| | | // have its seqnum |
| | | // here we have the right oldest change |
| | | // and in the draft case, we have its draft changenumber |
| | | |
| | | // Set and test the domain of the oldestChange see if we reached |
| | | // the end of the phase for this domain |
| | | clDomCtxts[iDom].currentState.update( |
| | | theOldestChange.getUpdateMsg().getChangeNumber()); |
| | | domainCtxts[iDom].currentState.update( |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | if (clDomCtxts[iDom].currentState.cover(clDomCtxts[iDom].stopState)) |
| | | if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState)) |
| | | { |
| | | clDomCtxts[iDom].active = false; |
| | | domainCtxts[iDom].active = false; |
| | | } |
| | | |
| | | // Test the seqnum of the oldestChange see if we reached |
| | | // the end of operation |
| | | /* TODO:ECL G Good changelog draft compat. Not yet implemented |
| | | if ((cLSearchCtxt.stopSeqnum>0) && |
| | | (theOldestChange.getSeqnum()>=cLSearchCtxt.stopSeqnum)) |
| | | { |
| | | closePhase1(); |
| | | |
| | | // means end of phase 1 to the calling writer |
| | | return null; |
| | | } |
| | | */ |
| | | |
| | | if (clDomCtxts[iDom].active) |
| | | if (domainCtxts[iDom].active) |
| | | { |
| | | // populates the table with the next eligible msg from idomain |
| | | // in non blocking mode, return null when no more eligible msg |
| | | getNextElligibleMessage(iDom); |
| | | domainCtxts[iDom].getNextEligibleMessageForDomain(operationId); |
| | | } |
| | | } // phase ==1 |
| | | } // while (nextclchange) |
| | | } // phase == INIT_PHASE |
| | | } // while (...) |
| | | |
| | | if (cLSearchCtxt.searchPhase==2) |
| | | if (searchPhase == PERSISTENT_PHASE) |
| | | { |
| | | clDomCtxtsToString("In getGeneralizedNextMessage (persistent): " + |
| | | "looking for the generalized oldest change"); |
| | | if (debugEnabled()) |
| | | clDomCtxtsToString("In getNextECLUpdate (persistent): " + |
| | | "looking for the generalized oldest change"); |
| | | |
| | | for (int ido=0; ido<clDomCtxts.length; ido++) |
| | | for (int ido=0; ido<domainCtxts.length; ido++) |
| | | { |
| | | // get next msg |
| | | getNextElligibleMessage(ido); |
| | | domainCtxts[ido].getNextEligibleMessageForDomain(operationId); |
| | | } |
| | | |
| | | // take the oldest one |
| | | iDom = getGeneralizedOldestChange(); |
| | | iDom = getOldestChangeFromDomainCtxts(); |
| | | |
| | | if (iDom != -1) |
| | | { |
| | | String suffix = this.clDomCtxts[iDom].rsd.getBaseDn(); |
| | | String suffix = this.domainCtxts[iDom].rsd.getBaseDn(); |
| | | |
| | | theOldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg, |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)domainCtxts[iDom].nextMsg, |
| | | null, // set later |
| | | suffix); |
| | | suffix, 0); |
| | | domainCtxts[iDom].nextMsg = null; // clean |
| | | |
| | | clDomCtxts[iDom].currentState.update( |
| | | theOldestChange.getUpdateMsg().getChangeNumber()); |
| | | domainCtxts[iDom].currentState.update( |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | /* TODO:ECL G Good changelog draft compat. |
| | | if (cLSearchCtxt.nextSeqnum!=-1) |
| | | if (draftCompat) |
| | | { |
| | | // should generate seqnum |
| | | // should generate DraftCN |
| | | DraftCNDbHandler draftCNDb =replicationServer.getDraftCNDbHandler(); |
| | | |
| | | // store in seqnumdb the pair |
| | | // (seqnum of the current change, state before this change) |
| | | replicationServerDomain.addSeqnum( |
| | | cLSearchCtxt.nextSeqnum, |
| | | getGenState(), |
| | | clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), |
| | | theOldestChange.getChangeNumber()); |
| | | theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); |
| | | cLSearchCtxt.nextSeqnum++; |
| | | oldestChange.setDraftChangeNumber( |
| | | replicationServer.getNewDraftCN()); |
| | | |
| | | // store in DraftCNdb the pair |
| | | // (DraftCN of the current change, state before this change) |
| | | draftCNDb.add( |
| | | oldestChange.getDraftChangeNumber(), |
| | | this.previousCookie.toString(), |
| | | domainCtxts[iDom].rsd.getBaseDn(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | } |
| | | */ |
| | | } |
| | | } |
| | | } |
| | |
| | | e); |
| | | } |
| | | |
| | | if (theOldestChange != null) |
| | | if (oldestChange != null) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" |
| | | + oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | // Update the current state |
| | | this.cLSearchCtxt.currentCookie.update( |
| | | theOldestChange.getServiceId(), |
| | | theOldestChange.getUpdateMsg().getChangeNumber()); |
| | | previousCookie.update( |
| | | oldestChange.getServiceId(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | // Set the current value of global state in the returned message |
| | | theOldestChange.setCookie(this.cLSearchCtxt.currentCookie); |
| | | oldestChange.setCookie(previousCookie); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate returns result oldest change =" + |
| | | oldestChange); |
| | | |
| | | } |
| | | return theOldestChange; |
| | | return oldestChange; |
| | | } |
| | | |
| | | /** |
| | | * Terminates the first (non persistent) phase of the search on the ECL. |
| | | */ |
| | | private void closePhase1() |
| | | private void closeInitPhase() |
| | | { |
| | | // starvation of changelog messages |
| | | // all domain have been unactived means are covered |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + "," + this + " closePhase1()" |
| | | + " searchCtxt=" + cLSearchCtxt); |
| | | getMonitorInstanceName() + "," + this + " closeInitPhase(): " |
| | | + dumpState()); |
| | | |
| | | // go to persistent phase if one |
| | | for (int i=0; i<clDomCtxts.length; i++) |
| | | clDomCtxts[i].active = true; |
| | | for (int i=0; i<domainCtxts.length; i++) |
| | | domainCtxts[i].active = true; |
| | | |
| | | if (this.cLSearchCtxt.isPersistent != StartECLSessionMsg.NON_PERSISTENT) |
| | | if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT) |
| | | { |
| | | // phase = 1 done AND persistent search => goto phase 2 |
| | | cLSearchCtxt.searchPhase=2; |
| | | // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE |
| | | searchPhase = PERSISTENT_PHASE; |
| | | |
| | | if (writer ==null) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | // phase = 1 done AND !persistent search => reinit to phase 0 |
| | | cLSearchCtxt.searchPhase=0; |
| | | // INIT_PHASE is done AND search is not persistent => reinit |
| | | searchPhase = UNDEFINED_PHASE; |
| | | } |
| | | |
| | | /* TODO:ECL G Good changelog draft compat. |
| | | if (seqnumDbReadIterator!=null) |
| | | if (draftCNDbIter!=null) |
| | | { |
| | | // End of phase 1 => always release the seqnum iterator |
| | | seqnumDbReadIterator.releaseCursor(); |
| | | seqnumDbReadIterator = null; |
| | | // End of INIT_PHASE => always release the iterator |
| | | draftCNDbIter.releaseCursor(); |
| | | draftCNDbIter = null; |
| | | } |
| | | */ |
| | | } |
| | | |
| | | /** |
| | | * Get the oldest change contained in the subx table. |
| | | * The subx table should be populated before |
| | | * @return the oldest change. |
| | | * Get the index in the domainCtxt table of the domain with the oldest change. |
| | | * @return the index of the domain with the oldest change, -1 when none. |
| | | */ |
| | | private int getGeneralizedOldestChange() |
| | | private int getOldestChangeFromDomainCtxts() |
| | | { |
| | | int oldest = -1; |
| | | for (int i=0; i<clDomCtxts.length; i++) |
| | | for (int i=0; i<domainCtxts.length; i++) |
| | | { |
| | | if ((clDomCtxts[i].active)) |
| | | if ((domainCtxts[i].active)) |
| | | { |
| | | // on the first loop, oldest==-1 |
| | | // .msg is null when the previous (non blocking) nextMessage did |
| | | // not have any eligible msg to return |
| | | if (clDomCtxts[i].nextMsg != null) |
| | | if (domainCtxts[i].nextMsg != null) |
| | | { |
| | | if ((oldest==-1) || |
| | | (clDomCtxts[i].nextMsg.compareTo(clDomCtxts[oldest].nextMsg)<0)) |
| | | (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0)) |
| | | { |
| | | oldest = i; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() |
| | | + "," + this + " getGeneralizedOldestChange() " + |
| | | " returns " + |
| | | ((oldest!=-1)?clDomCtxts[oldest].nextMsg:"")); |
| | | getMonitorInstanceName() |
| | | + "," + this + " getOldestChangeFromDomainCtxts() returns " + |
| | | ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1")); |
| | | |
| | | return oldest; |
| | | } |
| | | |
| | | /** |
| | | * Get from the provided domain, the next message elligible regarding |
| | | * the crossDomain elligible CN. Put it in the context table. |
| | | * @param idomain the provided domain. |
| | | */ |
| | | private void getNextElligibleMessage(int idomain) |
| | | { |
| | | ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN(); |
| | | try |
| | | { |
| | | if (clDomCtxts[idomain].nonElligiblemsg != null) |
| | | { |
| | | TRACER.debugInfo("getNextElligibleMessage tests if the already " + |
| | | " stored nonElligibleMsg has becoem elligible regarding " + |
| | | " the crossDomainElligibleCN ("+crossDomainElligibleCN + |
| | | " ) " + |
| | | clDomCtxts[idomain].nonElligiblemsg.getChangeNumber(). |
| | | older(crossDomainElligibleCN)); |
| | | // we already got the oldest msg and it was not elligible |
| | | if (clDomCtxts[idomain].nonElligiblemsg.getChangeNumber(). |
| | | older(crossDomainElligibleCN)) |
| | | { |
| | | // it is now elligible |
| | | clDomCtxts[idomain].nextMsg = clDomCtxts[idomain].nonElligiblemsg; |
| | | clDomCtxts[idomain].nonElligiblemsg = null; |
| | | } |
| | | else |
| | | { |
| | | // the oldest is still not elligible - let's wait next |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // non blocking |
| | | UpdateMsg newMsg = clDomCtxts[idomain].mh.getnextMessage(false); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + |
| | | " getNextElligibleMessage got the next changelogmsg " |
| | | + " from " + clDomCtxts[idomain].mh.getServiceId() |
| | | + " newCLMsg=" + newMsg); |
| | | clDomCtxts[idomain].nextMsg = |
| | | clDomCtxts[idomain].nonElligiblemsg = null; |
| | | // in non blocking mode, return null when no more msg |
| | | if (newMsg != null) |
| | | { |
| | | /* TODO:ECL Take into account eligibility. |
| | | TRACER.debugInfo("getNextElligibleMessage is " |
| | | + newMsg.getChangeNumber() |
| | | + new Date(newMsg.getChangeNumber().getTime()).toString() |
| | | + " elligible " |
| | | + newMsg.getChangeNumber().older(crossDomainElligibleCN)); |
| | | if (newMsg.getChangeNumber().older(crossDomainElligibleCN)) |
| | | { |
| | | // is elligible |
| | | clDomCtxts[idomain].nextMsg = newMsg; |
| | | } |
| | | else |
| | | { |
| | | // is not elligible |
| | | clDomCtxts[idomain].nonElligiblemsg = newMsg; |
| | | } |
| | | */ |
| | | clDomCtxts[idomain].nextMsg = newMsg; |
| | | } |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | */ |
| | | ECLEligibleCNComputerThread eligibleCNComputerThread = null; |
| | | ChangeNumber liveecn; |
| | | private ChangeNumber computeCrossDomainElligibleCN() |
| | | { |
| | | return liveecn; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This class specifies the thread that computes periodically |
| | | * the cross domain eligible CN. |
| | | */ |
| | | private final class ECLEligibleCNComputerThread |
| | | extends DirectoryThread |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private boolean shutdown = false; |
| | | |
| | | private ECLEligibleCNComputerThread() |
| | | { |
| | | super("ECL eligible CN computer thread"); |
| | | start(); |
| | | } |
| | | |
| | | public void run() |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | | try { |
| | | synchronized (this) |
| | | { |
| | | liveecn = computeNewCrossDomainElligibleCN(); |
| | | try |
| | | { |
| | | this.wait(1000); |
| | | } catch (InterruptedException e) |
| | | { } |
| | | } |
| | | } catch (Exception end) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private ChangeNumber computeNewCrossDomainElligibleCN() |
| | | { |
| | | ChangeNumber computedCrossDomainElligibleCN = null; |
| | | String s = "=> "; |
| | | |
| | | ReplicationServer rs = replicationServerDomain.getReplicationServer(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ECLSH.computeNewCrossDomainElligibleCN() " |
| | | + " periodic starts rs="+rs); |
| | | |
| | | Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator(); |
| | | if (rsdi != null) |
| | | { |
| | | while (rsdi.hasNext()) |
| | | { |
| | | ReplicationServerDomain domain = rsdi.next(); |
| | | if (domain.getBaseDn().equalsIgnoreCase("cn=changelog")) |
| | | continue; |
| | | |
| | | ChangeNumber domainElligibleCN = computeEligibleCN(domain); |
| | | if (domainElligibleCN==null) |
| | | continue; |
| | | if ((computedCrossDomainElligibleCN == null) || |
| | | (domainElligibleCN.older(computedCrossDomainElligibleCN))) |
| | | { |
| | | computedCrossDomainElligibleCN = domainElligibleCN; |
| | | } |
| | | s += "\n DN:" + domain.getBaseDn() |
| | | + "\t\t domainElligibleCN :" + domainElligibleCN |
| | | + "/" + |
| | | new Date(domainElligibleCN.getTime()).toString(); |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("SH.computeNewCrossDomainElligibleCN() periodic " + |
| | | " ends with " + |
| | | " the following domainElligibleCN for each domain :" + s + |
| | | "\n thus CrossDomainElligibleCN=" + computedCrossDomainElligibleCN + |
| | | " ts=" + |
| | | new Date(computedCrossDomainElligibleCN.getTime()).toString()); |
| | | |
| | | return computedCrossDomainElligibleCN; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Compute the eligible CN. |
| | | * @param rsd The provided replication server domain for which we want |
| | | * to retrieve the eligible date. |
| | | * @return null if the domain does not play in eligibility. |
| | | */ |
| | | public ChangeNumber computeEligibleCN(ReplicationServerDomain rsd) |
| | | { |
| | | ChangeNumber elligibleCN = null; |
| | | ServerState heartbeatState = rsd.getHeartbeatState(); |
| | | if (heartbeatState==null) |
| | | return null; |
| | | |
| | | // compute elligible CN |
| | | ServerState hbState = heartbeatState.duplicate(); |
| | | |
| | | Iterator<Short> it = hbState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber storedCN = hbState.getMaxChangeNumber(sid); |
| | | |
| | | // If the most recent UpdateMsg or CLHeartbeatMsg received is very old |
| | | // then the server is considered down and not considered for eligibility |
| | | if (TimeThread.getTime()-storedCN.getTime()>2000) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "For RSD." + rsd.getBaseDn() + " Server " + sid |
| | | + " is not considered for eligibility ... potentially down"); |
| | | continue; |
| | | } |
| | | |
| | | if ((elligibleCN == null) || (storedCN.older(elligibleCN))) |
| | | { |
| | | elligibleCN = storedCN; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "For RSD." + rsd.getBaseDn() + " ElligibleCN()=" + elligibleCN); |
| | | return elligibleCN; |
| | | } |
| | | |
| | | /** |
| | | * Returns the client operation id. |
| | | * @return The client operation id. |
| | | */ |
| | |
| | | * @return Whether the current search is persistent or not. |
| | | */ |
| | | public short isPersistent() { |
| | | return this.cLSearchCtxt.isPersistent; |
| | | return this.isPersistent; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return Whether the current search is persistent or not. |
| | | */ |
| | | public int getSearchPhase() { |
| | | return this.cLSearchCtxt.searchPhase; |
| | | return this.searchPhase; |
| | | } |
| | | |
| | | /** |
| | | * Refresh the eligibleCN by requesting the replication server. |
| | | */ |
| | | public void refreshEligibleCN() |
| | | { |
| | | eligibleCN = replicationServer.getEligibleCN(); |
| | | } |
| | | |
| | | /* |
| | | * Get first and last DraftCN |
| | | * @param crossDomainEligibleCN |
| | | * @return |
| | | */ |
| | | private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN) |
| | | throws DirectoryException |
| | | { |
| | | /* The content of the DraftCNdb depends on the SEARCH operations done before |
| | | * requesting the DraftCN. If no operations, DraftCNdb is empty. |
| | | * The limits we want to get are the "potential" limits if a request was |
| | | * done, the DraftCNdb is probably not complete to do that. |
| | | * |
| | | * The first DraftCN is : |
| | | * - the first record from the DraftCNdb |
| | | * - if none because DraftCNdb empty, |
| | | * then |
| | | * if no change in replchangelog then return 0 |
| | | * else return 1 (DraftCN that WILL be returned to next search) |
| | | * |
| | | * The last DraftCN is : |
| | | * - initialized with the last record from the DraftCNdb (0 if none) |
| | | * and consider the genState associated |
| | | * - to the last DraftCN, we add the count of updates in the replchangelog |
| | | * FROM that genState TO the crossDomainEligibleCN |
| | | * (this diff is done domain by domain) |
| | | */ |
| | | |
| | | int firstDraftCN; |
| | | int lastDraftCN; |
| | | boolean DraftCNdbIsEmpty; |
| | | DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler(); |
| | | |
| | | ReplicationServer rs = replicationServerDomain.getReplicationServer(); |
| | | |
| | | // Get the first DraftCN from the DraftCNdb |
| | | firstDraftCN = draftCNDbH.getFirstKey(); |
| | | HashMap<String,ServerState> domainsServerStateForLastSeqnum = null; |
| | | if (firstDraftCN < 1) |
| | | { |
| | | DraftCNdbIsEmpty=true; |
| | | firstDraftCN = 0; |
| | | lastDraftCN = 0; |
| | | } |
| | | else |
| | | { |
| | | DraftCNdbIsEmpty=false; |
| | | |
| | | // Get the last DraftCN from the DraftCNdb |
| | | lastDraftCN = draftCNDbH.getLastKey(); |
| | | |
| | | // Get the generalized state associated with the current last DraftCN |
| | | // and initializes from it the startStates table |
| | | String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN); |
| | | if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0)) |
| | | { |
| | | domainsServerStateForLastSeqnum = MultiDomainServerState. |
| | | splitGenStateToServerStates(lastSeqnumGenState); |
| | | } |
| | | } |
| | | |
| | | // Domain by domain |
| | | Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator(); |
| | | if (rsdi != null) |
| | | { |
| | | while (rsdi.hasNext()) |
| | | { |
| | | // process a domain |
| | | ReplicationServerDomain rsd = rsdi.next(); |
| | | |
| | | if (isServiceIDExcluded(rsd.getBaseDn())) |
| | | continue; |
| | | |
| | | // for this domain, have the state in the replchangelog |
| | | // where the last DraftCN update is |
| | | ServerState domainServerStateForLastSeqnum; |
| | | if ((domainsServerStateForLastSeqnum == null) || |
| | | (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null)) |
| | | { |
| | | domainServerStateForLastSeqnum = new ServerState(); |
| | | } |
| | | else |
| | | { |
| | | domainServerStateForLastSeqnum = |
| | | domainsServerStateForLastSeqnum.get(rsd.getBaseDn()); |
| | | } |
| | | |
| | | // Count the number of (eligible) changes from this place |
| | | // to the eligible CN (cross server) |
| | | long ec = rsd.getEligibleCount( |
| | | domainServerStateForLastSeqnum, crossDomainEligibleCN); |
| | | |
| | | // ... hum ... |
| | | if ((ec>0) && (DraftCNdbIsEmpty==false)) |
| | | ec--; |
| | | |
| | | // cumulates on domains |
| | | lastDraftCN += ec; |
| | | |
| | | // DraftCN is empty and there are eligible updates in the repl changelog |
| | | // then init first DraftCN |
| | | if ((ec>0) && (firstDraftCN==0)) |
| | | firstDraftCN = 1; |
| | | } |
| | | } |
| | | return new int[]{firstDraftCN, lastDraftCN}; |
| | | } |
| | | |
| | | private boolean isServiceIDExcluded(String serviceID) |
| | | { |
| | | // skip the excluded domains |
| | | boolean excluded = false; |
| | | for(String excludedServiceID : this.excludedServiceIDs) |
| | | { |
| | | if (excludedServiceID.equalsIgnoreCase(serviceID)) |
| | | { |
| | | excluded=true; |
| | | break; |
| | | } |
| | | } |
| | | return excluded; |
| | | } |
| | | } |