| | |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | |
| | | "] [searchPhase=" + searchPhase + |
| | | "] [startCookie=" + startCookie + |
| | | "] [previousCookie=" + previousCookie + |
| | | "]]"; |
| | | "]]"; |
| | | } |
| | | |
| | | /** |
| | |
| | | private String clDomCtxtsToString(String msg) |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | buffer.append(msg); |
| | | buffer.append("\n"); |
| | | buffer.append(msg).append("\n"); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.toString(buffer); |
| | | buffer.append("\n"); |
| | |
| | | * @param startECLSessionMsg the start parameters. |
| | | * @throws DirectoryException when an errors occurs. |
| | | */ |
| | | public ECLServerHandler( |
| | | ReplicationServer replicationServer, |
| | | StartECLSessionMsg startECLSessionMsg) |
| | | throws DirectoryException |
| | | public ECLServerHandler(ReplicationServer replicationServer, |
| | | StartECLSessionMsg startECLSessionMsg) throws DirectoryException |
| | | { |
| | | // queueSize is hard coded to 1 else super class hangs for some reason |
| | | this(null, 1, replicationServer, 0); |
| | |
| | | public void initializeCLSearchFromGenState(String crossDomainStartState) |
| | | throws DirectoryException |
| | | { |
| | | initializeCLDomCtxts(crossDomainStartState, false); |
| | | initializeChangelogDomainCtxts(crossDomainStartState, false); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void initializeCLSearchFromDraftCN(int startDraftCN) |
| | | throws DirectoryException |
| | | { |
| | | String crossDomainStartState; |
| | | try |
| | | { |
| | | String crossDomainStartState; |
| | | draftCompat = true; |
| | | |
| | | DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler(); |
| | |
| | | } |
| | | this.draftCompat = true; |
| | | |
| | | initializeCLDomCtxts(crossDomainStartState, true); |
| | | initializeChangelogDomainCtxts(crossDomainStartState, true); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | * in the provided cookie. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeCLDomCtxts(String providedCookie, |
| | | public void initializeChangelogDomainCtxts(String providedCookie, |
| | | boolean allowUnknownDomains) |
| | | throws DirectoryException |
| | | { |
| | |
| | | newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate(); |
| | | |
| | | // Assign the start state for the domain |
| | | if (isPersistent == |
| | | StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = rsd.getEligibleState(eligibleCN); |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDn()); |
| | |
| | | } |
| | | else if (!newDomainCtxt.startState.isEmpty()) |
| | | { |
| | | /* |
| | | when the provided startState is older than the replication |
| | | changelogdb startState, it means that the replication |
| | | changelog db has been trimmed and the cookie is not valid |
| | | anymore. |
| | | */ |
| | | boolean cookieTooOld = false; |
| | | for (int aServerId : rsd.getStartState()) |
| | | if (hasCookieBeenTrimmedFromDB(rsd, newDomainCtxt.startState)) |
| | | { |
| | | ChangeNumber dbOldestChange = |
| | | rsd.getStartState().getMaxChangeNumber(aServerId); |
| | | ChangeNumber providedChange = |
| | | newDomainCtxt.startState.getMaxChangeNumber(aServerId); |
| | | if (providedChange != null |
| | | && providedChange.older(dbOldestChange)) |
| | | { |
| | | cookieTooOld=true; |
| | | } |
| | | } |
| | | |
| | | if (cookieTooOld) |
| | | { |
| | | // the provided start |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | newDomainCtxt.rsd.getBaseDn())); |
| | |
| | | StringBuilder sb = new StringBuilder(); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | sb.append(domainCtxt.rsd.getBaseDn()).append(":") |
| | | .append(domainCtxt.startState).append(";"); |
| | | .append(domainCtxt.startState).append(";"); |
| | | } |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | |
| | | " initializeCLDomCtxts ends with " + " " + dumpState()); |
| | | } |
| | | |
| | | private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain, |
| | | ServerState cookie) |
| | | { |
| | | /* |
| | | when the provided startState is older than the replication |
| | | changelogdb startState, it means that the replication |
| | | changelog db has been trimmed and the cookie is not valid |
| | | anymore. |
| | | */ |
| | | for (int serverId : rsDomain.getStartState()) |
| | | { |
| | | ChangeNumber dbOldestChange = |
| | | rsDomain.getStartState().getMaxChangeNumber(serverId); |
| | | ChangeNumber providedChange = cookie.getMaxChangeNumber(serverId); |
| | | if (providedChange != null |
| | | && providedChange.older(dbOldestChange)) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Registers this handler into its related domain and notifies the domain. |
| | | */ |
| | | private void registerIntoDomain() |
| | | { |
| | | if (replicationServerDomain!=null) |
| | | if (replicationServerDomain != null) |
| | | replicationServerDomain.registerHandler(this); |
| | | } |
| | | |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | String str = serverURL + " " + String.valueOf(serverId); |
| | | |
| | | return "Connected External Changelog Server " + str + |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT; |
| | | return "Connected External Changelog Server " + serverURL + " " + serverId |
| | | + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT; |
| | | } |
| | | |
| | | /** |
| | |
| | | // TODO:ECL No monitoring exist for ECL. |
| | | return attributes; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String localString; |
| | | localString = "External changelog Server "; |
| | | final String eclServer = "External changelog Server "; |
| | | if (this.serverId != 0) |
| | | localString += serverId + " " + serverURL + " " + getBaseDN() |
| | | + " " + this.getOperationId(); |
| | | else |
| | | localString += this.getClass().getCanonicalName()+ " " + operationId; |
| | | return localString; |
| | | { |
| | | return eclServer + serverId + " " + serverURL + " " + getBaseDN() + " " |
| | | + operationId; |
| | | } |
| | | return eclServer + getClass().getCanonicalName() + " " + operationId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the status of the connected DS. |
| | | * @return The status of the connected DS. |
| | |
| | | replicationServer.disableEligibility(excludedBaseDNs); |
| | | eligibleCN = replicationServer.getEligibleCN(); |
| | | |
| | | if (startECLSessionMsg.getECLRequestType()== |
| | | StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE) |
| | | { |
| | | initializeCLSearchFromGenState( |
| | | startECLSessionMsg.getCrossDomainServerState()); |
| | | } |
| | | else if (startECLSessionMsg.getECLRequestType()== |
| | | StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER) |
| | | { |
| | | initializeCLSearchFromDraftCN( |
| | | startECLSessionMsg.getFirstDraftChangeNumber()); |
| | | } |
| | | initializeChangelogSearch(startECLSessionMsg); |
| | | |
| | | if (session != null) |
| | | { |
| | |
| | | // TODO:ECL Potential race condition if writer not yet resumed here |
| | | } |
| | | |
| | | if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | closeInitPhase(); |
| | | } |
| | |
| | | " initialized: " + |
| | | " " + dumpState() + " " + |
| | | " " + clDomCtxtsToString("")); |
| | | } |
| | | |
| | | private void initializeChangelogSearch(StartECLSessionMsg msg) |
| | | throws DirectoryException |
| | | { |
| | | short requestType = msg.getECLRequestType(); |
| | | if (requestType == REQUEST_TYPE_FROM_COOKIE) |
| | | { |
| | | initializeCLSearchFromGenState(msg.getCrossDomainServerState()); |
| | | } |
| | | else if (requestType == REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER) |
| | | { |
| | | initializeCLSearchFromDraftCN(msg.getFirstDraftChangeNumber()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | public ECLUpdateMsg takeECLUpdate() |
| | | throws DirectoryException |
| | | { |
| | | boolean interrupted = true; |
| | | ECLUpdateMsg msg = getNextECLUpdate(); |
| | | |
| | | // TODO:ECL We should refactor so that a SH always have a session |
| | | if (session == null) |
| | | return msg; |
| | | |
| | | boolean interrupted = true; |
| | | boolean acquired = false; |
| | | do |
| | | { |
| | |
| | | // Default is not to loop, with one exception |
| | | continueLooping = false; |
| | | |
| | | int iDom = getOldestChangeFromDomainCtxts(); |
| | | if (iDom == -1) |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | if (oldestContext == null) |
| | | { // there is no oldest change to process |
| | | closeInitPhase(); |
| | | |
| | |
| | | } |
| | | |
| | | // Build the ECLUpdateMsg to be returned |
| | | DomainContext oldestContext = domainCtxts[iDom]; |
| | | String suffix = oldestContext.rsd.getBaseDn(); |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)oldestContext.nextMsg, |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // cookie will be set later |
| | | suffix, |
| | | 0); // draftChangeNumber may be set later |
| | |
| | | "looking for the generalized oldest change"); |
| | | |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | // get next msg |
| | | domainCtxt.getNextEligibleMessageForDomain(operationId); |
| | | } |
| | | |
| | | // take the oldest one |
| | | int iDom = getOldestChangeFromDomainCtxts(); |
| | | if (iDom != -1) |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | if (oldestContext != null) |
| | | { |
| | | DomainContext oldestContext = domainCtxts[iDom]; |
| | | String suffix = oldestContext.rsd.getBaseDn(); |
| | | |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)oldestContext.nextMsg, |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // set later |
| | | suffix, 0); |
| | | oldestContext.nextMsg = null; // clean |
| | |
| | | // go to persistent phase if one |
| | | for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true; |
| | | |
| | | if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT) |
| | | if (this.isPersistent != NON_PERSISTENT) |
| | | { |
| | | // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE |
| | | searchPhase = PERSISTENT_PHASE; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the index in the domainCtxt table of the domain with the oldest change. |
| | | * @return the index of the domain with the oldest change, -1 when none. |
| | | * Find the domainCtxt of the domain with the oldest change. |
| | | * |
| | | * @return the domainCtxt of the domain with the oldest change, null when |
| | | * none. |
| | | */ |
| | | private int getOldestChangeFromDomainCtxts() |
| | | private DomainContext findOldestChangeFromDomainCtxts() |
| | | { |
| | | int oldest = -1; |
| | | for (int i=0; i<domainCtxts.length; i++) |
| | | DomainContext oldestCtxt = null; |
| | | for (DomainContext domainCtxt : domainCtxts) |
| | | { |
| | | if (domainCtxts[i].active) |
| | | if (domainCtxt.active |
| | | // .msg is null when the previous (non blocking) nextMessage did |
| | | // not have any eligible msg to return |
| | | && domainCtxt.nextMsg != null |
| | | && (oldestCtxt == null |
| | | || domainCtxt.nextMsg.compareTo(oldestCtxt.nextMsg) < 0)) |
| | | { |
| | | // on the first loop, oldest==-1 |
| | | // .msg is null when the previous (non blocking) nextMessage did |
| | | // not have any eligible msg to return |
| | | if (domainCtxts[i].nextMsg != null) |
| | | { |
| | | if ((oldest==-1) || |
| | | (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0)) |
| | | { |
| | | oldest = i; |
| | | } |
| | | } |
| | | oldestCtxt = domainCtxt; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In cn=changelog" |
| | | + "," + this + " getOldestChangeFromDomainCtxts() returns " + |
| | | ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1")); |
| | | TRACER.debugInfo("In cn=changelog," + this |
| | | + " getOldestChangeFromDomainCtxts() returns " |
| | | + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1")); |
| | | |
| | | return oldest; |
| | | return oldestCtxt; |
| | | } |
| | | |
| | | /** |