| | |
| | | private MultiDomainServerState previousCookie = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Eligible CSN - only changes older or equal to eligibleCSN are published in |
| | | * the ECL. |
| | | */ |
| | | private CSN eligibleCSN; |
| | | |
| | | /** |
| | | * The global list of contexts by domain for the search currently processed. |
| | | */ |
| | | private Set<DomainContext> domainCtxts = Collections.emptySet(); |
| | |
| | | */ |
| | | private String dumpState() |
| | | { |
| | | return getClass().getCanonicalName() + |
| | | "[" + |
| | | "[draftCompat=" + draftCompat + |
| | | "] [persistent=" + startECLSessionMsg.getPersistent() + |
| | | "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() + |
| | | "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached + |
| | | "] [searchPhase=" + searchPhase + |
| | | "] [startCookie=" + startCookie + |
| | | "] [previousCookie=" + previousCookie + |
| | | "]]"; |
| | | return getClass().getSimpleName() + |
| | | " [draftCompat=" + draftCompat + |
| | | ", persistent=" + startECLSessionMsg.getPersistent() + |
| | | ", startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() + |
| | | ", endOfCNIndexDBReached=" + isEndOfCNIndexDBReached + |
| | | ", searchPhase=" + searchPhase + |
| | | ", startCookie=" + startCookie + |
| | | ", previousCookie=" + previousCookie + |
| | | "]"; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | private class DomainContext |
| | | { |
| | | private ReplicationServerDomain rsDomain; |
| | | private final ReplicationServerDomain rsDomain; |
| | | |
| | | /** |
| | | * active when there are still changes supposed eligible for the ECL. |
| | | * Active when there are still changes supposed eligible for the ECL. It is |
| | | * active by default. |
| | | */ |
| | | private boolean active; |
| | | private boolean active = true; |
| | | private UpdateMsg nextMsg; |
| | | |
| | | /** |
| | | * the message handler from which are reading the changes for this domain. |
| | | */ |
| | | private MessageHandler mh; |
| | | private UpdateMsg nextMsg; |
| | | private UpdateMsg nextNonEligibleMsg; |
| | | private ServerState startState; |
| | | private ServerState currentState; |
| | | private ServerState stopState; |
| | | private long domainLatestTrimDate; |
| | | private final MessageHandler mh; |
| | | private final ServerState startState; |
| | | private final ServerState currentState = new ServerState(); |
| | | private final ServerState stopState; |
| | | private final long domainLatestTrimDate; |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public DomainContext(ReplicationServerDomain domain, |
| | | ServerState startState, ServerState stopState, MessageHandler mh) |
| | | { |
| | | this.rsDomain = domain; |
| | | this.startState = startState; |
| | | this.stopState = stopState; |
| | | this.mh = mh; |
| | | this.domainLatestTrimDate = domain.getLatestDomainTrimDate(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | toString(buffer); |
| | | return buffer.toString(); |
| | | } |
| | | /** |
| | | * Provide a string representation of this object for debug purpose.. |
| | | * @param buffer Append to this buffer. |
| | | */ |
| | | public void toString(StringBuilder buffer) |
| | | |
| | | private StringBuilder toString(StringBuilder buffer) |
| | | { |
| | | buffer.append("[ [active=").append(active) |
| | | .append("] [rsDomain=").append(rsDomain) |
| | | .append("] [nextMsg=").append(nextMsg).append("(") |
| | | .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "") |
| | | .append(")") |
| | | .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg) |
| | | .append("] [startState=").append(startState) |
| | | .append("] [currentState=").append(currentState) |
| | | .append("] [stopState=").append(stopState) |
| | | .append("]]"); |
| | | buffer.append(getClass().getSimpleName()); |
| | | buffer.append(" ["); |
| | | buffer.append(active ? "active" : "inactive"); |
| | | buffer.append(", baseDN=\"").append(rsDomain.getBaseDN()).append("\""); |
| | | if (nextMsg != null) |
| | | { |
| | | buffer.append(", csn=").append(nextMsg.getCSN().toStringUI()); |
| | | } |
| | | buffer.append(", nextMsg=[").append(nextMsg); |
| | | buffer.append("]") |
| | | .append(", startState=").append(startState) |
| | | .append(", currentState=").append(currentState) |
| | | .append(", stopState=").append(stopState) |
| | | .append("]"); |
| | | return buffer; |
| | | } |
| | | |
| | | /** |
| | | * Computes the next message eligible regarding the crossDomain eligible |
| | | * CSN. |
| | | * |
| | | * @param opId The operation id. |
| | | * Computes the next available message for this domain context. |
| | | */ |
| | | private void computeNextEligibleMessageForDomain(String opId) |
| | | private void computeNextAvailableMessage() |
| | | { |
| | | nextMsg = getNextMessage(); |
| | | if (debugEnabled()) |
| | | debugInfo(opId, "ctxt=" + this); |
| | | |
| | | assert(nextMsg == null); |
| | | try |
| | | { |
| | | // Before get a new message from the domain, evaluate in priority |
| | | // a message that has not been published to the ECL because it was |
| | | // not eligible |
| | | if (nextNonEligibleMsg != null) |
| | | { |
| | | final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg); |
| | | |
| | | if (debugEnabled()) |
| | | debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg |
| | | + " has now become eligible regarding the eligibleCSN (" |
| | | + eligibleCSN + " ): " + hasBecomeEligible); |
| | | |
| | | if (hasBecomeEligible) |
| | | { |
| | | nextMsg = nextNonEligibleMsg; |
| | | nextNonEligibleMsg = null; |
| | | } |
| | | // else the oldest is still not eligible - let's wait next |
| | | } |
| | | else |
| | | { |
| | | // Here comes a new message !!! |
| | | final UpdateMsg newMsg = getNextMessage(); |
| | | if (newMsg == null) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | debugInfo(opId, "got new message : [newMsg=" + newMsg + "] " |
| | | + dumpState()); |
| | | |
| | | final boolean isEligible = isEligible(newMsg); |
| | | |
| | | if (debugEnabled()) |
| | | debugInfo(opId, "newMsg isEligible=" + isEligible + " since " |
| | | + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=[" |
| | | + toString(eligibleCSN) + "] " + dumpState()); |
| | | |
| | | if (isEligible) |
| | | { |
| | | nextMsg = newMsg; |
| | | } |
| | | else |
| | | { |
| | | nextNonEligibleMsg = newMsg; |
| | | } |
| | | } |
| | | TRACER.debugInfo("In ECLServerHandler, for baseDN=" |
| | | + mh.getBaseDNString() + " computeNextAvailableMessage(" |
| | | + getOperationId() + ") : newMsg=[" + nextMsg + "] " + dumpState()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | |
| | | private boolean isEligible(UpdateMsg msg) |
| | | { |
| | | return msg.getCSN().getTime() <= eligibleCSN.getTime(); |
| | | } |
| | | |
| | | private UpdateMsg getNextMessage() |
| | |
| | | } |
| | | } |
| | | |
| | | private String toString(CSN csn) |
| | | { |
| | | return csn + " " + asDate(csn); |
| | | } |
| | | |
| | | private void debugInfo(String opId, String message) |
| | | { |
| | | TRACER.debugInfo("In ECLServerHandler, for baseDN=" |
| | | + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId |
| | | + ") " + message); |
| | | } |
| | | |
| | | /** |
| | | * Unregister the handler from the DomainContext ReplicationDomain. |
| | | * @return Whether the handler has been unregistered with success. |
| | |
| | | |
| | | private String domaimCtxtsToString(String msg) |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | final StringBuilder buffer = new StringBuilder(); |
| | | buffer.append(msg).append("\n"); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.toString(buffer); |
| | | buffer.append("\n"); |
| | | domainCtxt.toString(buffer).append("\n"); |
| | | } |
| | | return buffer.toString(); |
| | | } |
| | |
| | | // Initializes each and every domain with the next(first) eligible message |
| | | // from the domain. |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.computeNextEligibleMessageForDomain(getOperationId()); |
| | | domainCtxt.computeNextAvailableMessage(); |
| | | |
| | | if (domainCtxt.nextMsg == null) |
| | | { |
| | | domainCtxt.active = false; |
| | | } |
| | | } |
| | | } |
| | | catch(DirectoryException de) |
| | |
| | | continue; |
| | | |
| | | // Creates the new domain context |
| | | final DomainContext newDomainCtxt = new DomainContext(); |
| | | newDomainCtxt.active = true; |
| | | newDomainCtxt.rsDomain = domain; |
| | | newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate(); |
| | | |
| | | // Assign the start state for the domain |
| | | final DomainContext newDomainCtxt; |
| | | final ServerState domainStartState = |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = latestState; |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | newDomainCtxt = newDomainContext(domain, null, latestState); |
| | | } |
| | | else |
| | | { |
| | | // let's take the start state for this domain from the provided cookie |
| | | newDomainCtxt.startState = |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | |
| | | ServerState startState = domainStartState; |
| | | if (providedCookie == null || providedCookie.length() == 0 |
| | | || allowUnknownDomains) |
| | | { |
| | | // when there is no cookie provided in the request, |
| | | // let's start traversing this domain from the beginning of |
| | | // what we have in the replication changelog |
| | | if (newDomainCtxt.startState == null) |
| | | if (startState == null) |
| | | { |
| | | newDomainCtxt.startState = |
| | | startState = |
| | | domain.getOldestState().duplicateOnlyOlderThan( |
| | | newDomainCtxt.domainLatestTrimDate); |
| | | domain.getLatestDomainTrimDate()); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // when there is a cookie provided in the request, |
| | | if (newDomainCtxt.startState == null) |
| | | if (startState == null) |
| | | { |
| | | missingDomains.append(domain.getBaseDN()).append(":;"); |
| | | continue; |
| | | } |
| | | else if (!newDomainCtxt.startState.isEmpty() |
| | | && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState)) |
| | | else if (!startState.isEmpty() |
| | | && hasCookieBeenTrimmedFromDB(domain, startState)) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | newDomainCtxt.rsDomain.getBaseDN().toNormalizedString())); |
| | | domain.getBaseDN().toNormalizedString())); |
| | | } |
| | | } |
| | | |
| | | newDomainCtxt.stopState = latestState; |
| | | newDomainCtxt = newDomainContext(domain, startState, latestState); |
| | | } |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer); |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | mh.setBaseDNAndDomain(domain.getBaseDN(), false); |
| | | // register the unconnected into the domain |
| | | domain.registerHandler(mh); |
| | | newDomainCtxt.mh = mh; |
| | | |
| | | previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(), |
| | | newDomainCtxt.startState.duplicate()); |
| | |
| | | results.add(newDomainCtxt); |
| | | } |
| | | |
| | | if (missingDomains.length()>0) |
| | | if (missingDomains.length() > 0) |
| | | { |
| | | // If there are domain missing in the provided cookie, |
| | | // the request is rejected and a full resync is required. |
| | |
| | | */ |
| | | if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains) |
| | | { |
| | | // JNR: Will the following code trigger a ConcurrentModificationException? |
| | | for (DN providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | final Set<DN> providedDomains = startStatesFromProvidedCookie.keySet(); |
| | | for (Iterator<DN> iter = providedDomains.iterator(); iter.hasNext();) |
| | | { |
| | | DN providedDomain = iter.next(); |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | { |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Now do the final checking |
| | |
| | | return results; |
| | | } |
| | | |
| | | private DomainContext newDomainContext(ReplicationServerDomain domain, |
| | | ServerState startState, ServerState stopState) throws DirectoryException |
| | | { |
| | | // Create an unconnected MessageHandler for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer); |
| | | mh.setInitialServerState(startState); |
| | | mh.setBaseDNAndDomain(domain.getBaseDN(), false); |
| | | // register the unconnected into the domain |
| | | domain.registerHandler(mh); |
| | | |
| | | return new DomainContext(domain, startState, stopState, mh); |
| | | } |
| | | |
| | | private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain, |
| | | ServerState cookie) |
| | | { |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(getClass().getCanonicalName() + " " + getOperationId() |
| | | + " initialized: " + " " + dumpState() + " " + " " |
| | | + domaimCtxtsToString("")); |
| | | + " initialized: " + " " + dumpState() + domaimCtxtsToString("")); |
| | | } |
| | | |
| | | private void initializeChangelogSearch(StartECLSessionMsg msg) |
| | | throws DirectoryException |
| | | { |
| | | refreshEligibleCSN(); |
| | | |
| | | if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE) |
| | | { |
| | | initializeCLSearchFromCookie(msg.getCrossDomainServerState()); |
| | |
| | | */ |
| | | public ECLUpdateMsg takeECLUpdate() throws DirectoryException |
| | | { |
| | | refreshEligibleCSN(); |
| | | ECLUpdateMsg msg = getNextECLUpdate(); |
| | | |
| | | // TODO:ECL We should refactor so that a SH always have a session |
| | |
| | | public ECLUpdateMsg getNextECLUpdate() throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In cn=changelog" + this + |
| | | TRACER.debugInfo("In cn=changelog " + this + |
| | | " getNextECLUpdate starts: " + dumpState()); |
| | | |
| | | ECLUpdateMsg oldestChange = null; |
| | |
| | | } |
| | | if (oldestContext.active) |
| | | { |
| | | oldestContext.computeNextEligibleMessageForDomain(getOperationId()); |
| | | oldestContext.computeNextAvailableMessage(); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | |
| | | "In getNextECLUpdate (persistent): " |
| | | + "looking for the generalized oldest change")); |
| | | |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.computeNextEligibleMessageForDomain(getOperationId()); |
| | | for (DomainContext domainCtxt : domainCtxts) |
| | | { |
| | | domainCtxt.computeNextAvailableMessage(); |
| | | } |
| | | |
| | | final DomainContext oldestContext = findDomainCtxtWithOldestChange(); |
| | |
| | | return this.searchPhase != INIT_PHASE; |
| | | } |
| | | |
| | | /** |
| | | * Refresh the eligibleCSN by requesting the replication server. |
| | | */ |
| | | private void refreshEligibleCSN() |
| | | { |
| | | Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); |
| | | eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs); |
| | | } |
| | | |
| | | } |