| | |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg |
| | | .ECLRequestType.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg |
| | | .Persistent.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | * |
| | | * @see #getSearchPhase() |
| | | */ |
| | | public static int INIT_PHASE = 1; |
| | | private static int INIT_PHASE = 1; |
| | | /** |
| | | * The persistent phase is only used for persistent searches on the External |
| | | * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the |
| | |
| | | */ |
| | | private static int PERSISTENT_PHASE = 2; |
| | | |
| | | /** |
| | | * This is a string identifying the operation, provided by the client part of |
| | | * the ECL, used to help interpretation of messages logged. |
| | | */ |
| | | private String operationId; |
| | | private StartECLSessionMsg startECLSessionMsg; |
| | | |
| | | /** Cursor on the {@link ChangeNumberIndexDB}. */ |
| | | private DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor; |
| | | |
| | | private boolean draftCompat = false; |
| | | /** |
| | | * Specifies the last changer number requested. |
| | | */ |
| | | private long lastChangeNumber = 0; |
| | | /** |
| | | * Specifies whether the change number db has been read until its end. |
| | | */ |
| | | private boolean isEndOfCNIndexDBReached = false; |
| | | /** |
| | | * Specifies whether the current search has been requested to be persistent |
| | | * or not. |
| | | */ |
| | | private short isPersistent; |
| | | /** |
| | | * Specifies the current search phase : INIT or PERSISTENT. |
| | | */ |
| | | private int searchPhase = INIT_PHASE; |
| | |
| | | * (thus becoming the "current" cookie just before the change is returned. |
| | | */ |
| | | private MultiDomainServerState previousCookie = new MultiDomainServerState(); |
| | | /** |
| | | * Specifies the excluded DNs (like cn=admin, ...). |
| | | */ |
| | | private Set<String> excludedBaseDNs = new HashSet<String>(); |
| | | |
| | | /** |
| | | * Eligible CSN - only changes older or equal to eligibleCSN are published in |
| | |
| | | * Provides a string representation of this object. |
| | | * @return the string representation. |
| | | */ |
| | | public String dumpState() |
| | | private String dumpState() |
| | | { |
| | | return getClass().getCanonicalName() + |
| | | "[" + |
| | | "[draftCompat=" + draftCompat + |
| | | "] [persistent=" + isPersistent + |
| | | "] [startChangeNumber=" + lastChangeNumber + |
| | | "] [persistent=" + startECLSessionMsg.getPersistent() + |
| | | "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() + |
| | | "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached + |
| | | "] [searchPhase=" + searchPhase + |
| | | "] [startCookie=" + startCookie + |
| | |
| | | // Initializes each and every domain with the next(first) eligible message |
| | | // from the domain. |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.computeNextEligibleMessageForDomain(operationId); |
| | | domainCtxt.computeNextEligibleMessageForDomain(getOperationId()); |
| | | |
| | | if (domainCtxt.nextMsg == null) |
| | | domainCtxt.active = false; |
| | |
| | | continue; |
| | | |
| | | // skip the excluded domains |
| | | Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); |
| | | if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString())) |
| | | { |
| | | // this is an excluded domain |
| | |
| | | newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate(); |
| | | |
| | | // Assign the start state for the domain |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = latestState; |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | |
| | | if (this.serverId != 0) |
| | | { |
| | | return eclServer + serverId + " " + serverURL + " " |
| | | + getBaseDNString() + " " + operationId; |
| | | + getBaseDNString() + " " + getOperationId(); |
| | | } |
| | | return eclServer + getClass().getCanonicalName() + " " + operationId; |
| | | return eclServer + getClass().getCanonicalName() + " " + getOperationId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | private void initialize(StartECLSessionMsg startECLSessionMsg) |
| | | throws DirectoryException |
| | | { |
| | | this.operationId = startECLSessionMsg.getOperationId(); |
| | | this.startECLSessionMsg = startECLSessionMsg; |
| | | |
| | | isPersistent = startECLSessionMsg.isPersistent(); |
| | | lastChangeNumber = startECLSessionMsg.getLastChangeNumber(); |
| | | searchPhase = INIT_PHASE; |
| | | final String cookie = startECLSessionMsg.getCrossDomainServerState(); |
| | | try |
| | |
| | | ERR_INVALID_COOKIE_SYNTAX.get(cookie)); |
| | | } |
| | | |
| | | excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); |
| | | refreshEligibleCSN(); |
| | | |
| | | initializeChangelogSearch(startECLSessionMsg); |
| | | |
| | | if (session != null) |
| | |
| | | // TODO:ECL Potential race condition if writer not yet resumed here |
| | | } |
| | | |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | closeInitPhase(); |
| | | } |
| | |
| | | registerIntoDomain(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId |
| | | TRACER.debugInfo(getClass().getCanonicalName() + " " + getOperationId() |
| | | + " initialized: " + " " + dumpState() + " " + " " |
| | | + domaimCtxtsToString("")); |
| | | } |
| | |
| | | private void initializeChangelogSearch(StartECLSessionMsg msg) |
| | | throws DirectoryException |
| | | { |
| | | short requestType = msg.getECLRequestType(); |
| | | if (requestType == REQUEST_TYPE_FROM_COOKIE) |
| | | refreshEligibleCSN(); |
| | | |
| | | if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE) |
| | | { |
| | | initializeCLSearchFromCookie(msg.getCrossDomainServerState()); |
| | | } |
| | | else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER) |
| | | else if (msg.getECLRequestType() == REQUEST_TYPE_FROM_CHANGE_NUMBER) |
| | | { |
| | | initializeCLSearchFromChangeNumber(msg.getFirstChangeNumber()); |
| | | } |
| | |
| | | } |
| | | if (oldestContext.active) |
| | | { |
| | | oldestContext.computeNextEligibleMessageForDomain(operationId); |
| | | oldestContext.computeNextEligibleMessageForDomain(getOperationId()); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | |
| | | + "looking for the generalized oldest change")); |
| | | |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.computeNextEligibleMessageForDomain(operationId); |
| | | domainCtxt.computeNextEligibleMessageForDomain(getOperationId()); |
| | | } |
| | | |
| | | final DomainContext oldestContext = findDomainCtxtWithOldestChange(); |
| | |
| | | |
| | | private boolean isBeyondLastRequestedChangeNumber(final ECLUpdateMsg change) |
| | | { |
| | | final long lastChangeNumber = startECLSessionMsg.getLastChangeNumber(); |
| | | return draftCompat |
| | | && 0 < lastChangeNumber |
| | | && lastChangeNumber < change.getChangeNumber(); |
| | |
| | | // go to persistent phase if one |
| | | for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true; |
| | | |
| | | if (this.isPersistent != NON_PERSISTENT) |
| | | if (startECLSessionMsg.getPersistent() != NON_PERSISTENT) |
| | | { |
| | | // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE |
| | | searchPhase = PERSISTENT_PHASE; |
| | |
| | | */ |
| | | public String getOperationId() |
| | | { |
| | | return operationId; |
| | | return startECLSessionMsg.getOperationId(); |
| | | } |
| | | |
| | | /** |
| | | * Getter for the persistent property of the current search. |
| | | * @return Whether the current search is persistent or not. |
| | | * Returns whether the current search is a persistent search . |
| | | * |
| | | * @return true if the current search is a persistent search, false otherwise |
| | | */ |
| | | public short isPersistent() { |
| | | return this.isPersistent; |
| | | boolean isNonPersistent() |
| | | { |
| | | return startECLSessionMsg.getPersistent() == NON_PERSISTENT; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the current search phase (INIT or PERSISTENT). |
| | | * @return Whether the current search is persistent or not. |
| | | * Returns whether the initialization phase has completed. |
| | | * |
| | | * @return true the initialization phase has completed, false otherwise |
| | | */ |
| | | public int getSearchPhase() { |
| | | return this.searchPhase; |
| | | boolean isInitPhaseDone() |
| | | { |
| | | return this.searchPhase != INIT_PHASE; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | private void refreshEligibleCSN() |
| | | { |
| | | Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); |
| | | eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs); |
| | | } |
| | | |