| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | |
| | | */ |
| | | private String startCookie; |
| | | /** |
| | | * Specifies the value of the cookie before the change currently processed |
| | | * is returned. It is updated with the change number of the change |
| | | * currently processed (thus becoming the "current" cookie just |
| | | * before the change is returned. |
| | | * Specifies the value of the cookie before the change currently processed is |
| | | * returned. It is updated with the CSN of the change currently processed |
| | | * (thus becoming the "current" cookie just before the change is returned. |
| | | */ |
| | | private MultiDomainServerState previousCookie = new MultiDomainServerState(); |
| | | /** |
| | |
| | | private Set<String> excludedBaseDNs = new HashSet<String>(); |
| | | |
| | | /** |
| | | * Eligible changeNumber - only changes older or equal to eligibleCN |
| | | * are published in the ECL. |
| | | * Eligible CSN - only changes older or equal to eligibleCSN * are published |
| | | * in the ECL. |
| | | */ |
| | | private ChangeNumber eligibleCN = null; |
| | | private CSN eligibleCSN; |
| | | |
| | | /** |
| | | * Provides a string representation of this object. |
| | |
| | | .append("] [rsd=").append(rsd) |
| | | .append("] [nextMsg=").append(nextMsg).append("(") |
| | | .append(nextMsg != null ? |
| | | new Date(nextMsg.getChangeNumber().getTime()).toString():"") |
| | | new Date(nextMsg.getCSN().getTime()).toString():"") |
| | | .append(")") |
| | | .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg) |
| | | .append("] [startState=").append(startState) |
| | |
| | | |
| | | /** |
| | | * Get the next message eligible regarding |
| | | * the crossDomain eligible CN. Put it in the context table. |
| | | * the crossDomain eligible CSN. Put it in the context table. |
| | | * @param opid The operation id. |
| | | */ |
| | | private void getNextEligibleMessageForDomain(String opid) |
| | |
| | | if (nextNonEligibleMsg != null) |
| | | { |
| | | boolean hasBecomeEligible = |
| | | (nextNonEligibleMsg.getChangeNumber().getTime() |
| | | <= eligibleCN.getTime()); |
| | | (nextNonEligibleMsg.getCSN().getTime() |
| | | <= eligibleCSN.getTime()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + " stored nonEligibleMsg " + nextNonEligibleMsg |
| | | + " has now become eligible regarding " |
| | | + " the eligibleCN ("+ eligibleCN |
| | | + " the eligibleCSN ("+ eligibleCSN |
| | | + " ):" + hasBecomeEligible); |
| | | |
| | | if (hasBecomeEligible) |
| | |
| | | // to be returned in the external changelog. |
| | | // So let's check if the chg time is older than the trim date |
| | | } while ((newMsg!=null) && |
| | | (newMsg.getChangeNumber().getTime() < domainLatestTrimDate)); |
| | | (newMsg.getCSN().getTime() < domainLatestTrimDate)); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() + |
| | |
| | | // in non blocking mode, return null when no more msg |
| | | if (newMsg != null) |
| | | { |
| | | boolean isEligible = (newMsg.getChangeNumber().getTime() |
| | | <= eligibleCN.getTime()); |
| | | boolean isEligible = (newMsg.getCSN().getTime() |
| | | <= eligibleCSN.getTime()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() |
| | | + " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "newMsg isEligible=" + isEligible + " since " |
| | | + "newMsg=[" + newMsg.getChangeNumber() |
| | | + " " + new Date(newMsg.getChangeNumber().getTime()) |
| | | + "] eligibleCN=[" + eligibleCN |
| | | + " " + new Date(eligibleCN.getTime())+"]" |
| | | + "newMsg=[" + newMsg.getCSN() |
| | | + " " + new Date(newMsg.getCSN().getTime()) |
| | | + "] eligibleCSN=[" + eligibleCSN |
| | | + " " + new Date(eligibleCSN.getTime())+"]" |
| | | + dumpState()); |
| | | |
| | | if (isEligible) |
| | |
| | | // startDraftCN provided in the request IS NOT in the DraftCNDb |
| | | |
| | | /* |
| | | * Get the draftLimits (from the eligibleCN got at the beginning of the |
| | | * Get the draftLimits (from the eligibleCSN got at the beginning of the |
| | | * operation) in order to have the first and possible last DraftCN. |
| | | */ |
| | | final int[] limits = |
| | | replicationServer.getECLDraftCNLimits(eligibleCN, excludedBaseDNs); |
| | | replicationServer.getECLDraftCNLimits(eligibleCSN, excludedBaseDNs); |
| | | final int firstDraftCN = limits[0]; |
| | | final int lastDraftCN = limits[1]; |
| | | |
| | |
| | | // Assign the start state for the domain |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = rsd.getEligibleState(eligibleCN); |
| | | newDomainCtxt.startState = rsd.getEligibleState(eligibleCSN); |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDn()); |
| | | } |
| | | else |
| | |
| | | // what we have in the replication changelog |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | ChangeNumber latestTrimCN = |
| | | new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0, 0); |
| | | CSN latestTrimCSN = |
| | | new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0); |
| | | newDomainCtxt.startState = |
| | | rsd.getStartState().duplicateOnlyOlderThan(latestTrimCN); |
| | | rsd.getStartState().duplicateOnlyOlderThan(latestTrimCSN); |
| | | } |
| | | } |
| | | else |
| | |
| | | } |
| | | } |
| | | |
| | | // Set the stop state for the domain from the eligibleCN |
| | | newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN); |
| | | // Set the stop state for the domain from the eligibleCSN |
| | | newDomainCtxt.stopState = rsd.getEligibleState(eligibleCSN); |
| | | } |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | |
| | | */ |
| | | for (int serverId : rsDomain.getStartState()) |
| | | { |
| | | ChangeNumber dbOldestChange = |
| | | rsDomain.getStartState().getChangeNumber(serverId); |
| | | ChangeNumber providedChange = cookie.getChangeNumber(serverId); |
| | | CSN dbOldestChange = |
| | | rsDomain.getStartState().getCSN(serverId); |
| | | CSN providedChange = cookie.getCSN(serverId); |
| | | if (providedChange != null |
| | | && providedChange.older(dbOldestChange)) |
| | | { |
| | |
| | | |
| | | excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); |
| | | replicationServer.disableEligibility(excludedBaseDNs); |
| | | eligibleCN = replicationServer.getEligibleCN(); |
| | | eligibleCSN = replicationServer.getEligibleCSN(); |
| | | |
| | | initializeChangelogSearch(startECLSessionMsg); |
| | | |
| | |
| | | closeInitPhase(); |
| | | } |
| | | |
| | | /* TODO: From replication changenumber |
| | | /* TODO: From replication CSN |
| | | //-- |
| | | if (startCLMsg.getStartMode()==2) |
| | | { |
| | | if (CLSearchFromProvidedExactCN(startCLMsg.getChangeNumber())) |
| | | if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN())) |
| | | return; |
| | | } |
| | | |
| | |
| | | { |
| | | // to get the CL first and last |
| | | initializeCLDomCtxts(null); // from start |
| | | ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN(); |
| | | CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN(); |
| | | |
| | | try |
| | | { |
| | | // to get the CL first and last |
| | | // last rely on the crossDomainEligibleCN thus must have been |
| | | // last rely on the crossDomainEligibleCSN thus must have been |
| | | // computed before |
| | | int[] limits = computeCLLimits(crossDomainEligibleCN); |
| | | int[] limits = computeCLLimits(crossDomainEligibleCSN); |
| | | // Send the response |
| | | CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]); |
| | | session.publish(msg); |
| | |
| | | // Set and test the domain of the oldestChange see if we reached |
| | | // the end of the phase for this domain |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getChangeNumber()); |
| | | change.getUpdateMsg().getCSN()); |
| | | |
| | | if (oldestContext.currentState.cover(oldestContext.stopState)) |
| | | { |
| | |
| | | oldestContext.nextMsg = null; // clean |
| | | |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getChangeNumber()); |
| | | change.getUpdateMsg().getCSN()); |
| | | |
| | | if (draftCompat) |
| | | { |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" |
| | | + oldestChange.getUpdateMsg().getChangeNumber()); |
| | | + oldestChange.getUpdateMsg().getCSN()); |
| | | |
| | | // Update the current state |
| | | previousCookie.update( |
| | | oldestChange.getBaseDN(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | oldestChange.getUpdateMsg().getCSN()); |
| | | |
| | | // Set the current value of global state in the returned message |
| | | oldestChange.setCookie(previousCookie); |
| | |
| | | // The following loop allows to loop until being on the same cn |
| | | // in the 2 dbs |
| | | |
| | | // replogcn : the oldest change from the changelog db |
| | | ChangeNumber cnFromChangelogDb = |
| | | oldestChange.getUpdateMsg().getChangeNumber(); |
| | | // replogCSN : the oldest change from the changelog db |
| | | CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN(); |
| | | String dnFromChangelogDb = oldestChange.getBaseDN(); |
| | | |
| | | while (true) |
| | |
| | | |
| | | |
| | | // the next change from the DraftCN db |
| | | ChangeNumber cnFromDraftCNDb = changelogDBIter.getChangeNumber(); |
| | | CSN csnFromDraftCNDb = changelogDBIter.getCSN(); |
| | | String dnFromDraftCNDb = changelogDBIter.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " comparing the 2 db DNs :" + dnFromChangelogDb + "?=" |
| | | + cnFromChangelogDb + " timestamps:" |
| | | + new Date(cnFromChangelogDb.getTime()) + " ?older" |
| | | + new Date(cnFromDraftCNDb.getTime())); |
| | | + csnFromChangelogDb + " timestamps:" |
| | | + new Date(csnFromChangelogDb.getTime()) + " ?older" |
| | | + new Date(csnFromDraftCNDb.getTime())); |
| | | |
| | | |
| | | if (areSameChange(cnFromChangelogDb, dnFromChangelogDb, |
| | | cnFromDraftCNDb, dnFromDraftCNDb)) |
| | | if (areSameChange(csnFromChangelogDb, dnFromChangelogDb, |
| | | csnFromDraftCNDb, dnFromDraftCNDb)) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | |
| | | } |
| | | |
| | | |
| | | if (!cnFromDraftCNDb.older(cnFromChangelogDb)) |
| | | if (!csnFromDraftCNDb.older(csnFromChangelogDb)) |
| | | { |
| | | // the change from the changelogDb is older |
| | | // it should have been stored lately |
| | | // let's continue to traverse the changelogdb |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate: will skip " + cnFromChangelogDb |
| | | TRACER.debugInfo("getNextECLUpdate: will skip " + csnFromChangelogDb |
| | | + " and read next from the regular changelog."); |
| | | return false; // TO BE CHECKED |
| | | } |
| | |
| | | // found in the changelogDb. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " will skip " + cnFromDraftCNDb |
| | | + " will skip " + csnFromDraftCNDb |
| | | + " and read next change from the DraftCNDb."); |
| | | |
| | | isEndOfDraftCNReached = !changelogDBIter.next(); |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " has skipped to " + " sn=" + changelogDBIter.getDraftCN() |
| | | + " cn=" + changelogDBIter.getChangeNumber() |
| | | + " csn=" + changelogDBIter.getCSN() |
| | | + " End of draftCNDb ?" + isEndOfDraftCNReached); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean areSameChange(ChangeNumber cn1, String dn1, ChangeNumber cn2, |
| | | String dn2) |
| | | private boolean areSameChange(CSN csn1, String dn1, CSN csn2, String dn2) |
| | | { |
| | | boolean sameDN = dn1.compareTo(dn2) == 0; |
| | | boolean sameCN = cn1.compareTo(cn2) == 0; |
| | | return sameDN && sameCN; |
| | | boolean sameCSN = csn1.compareTo(csn2) == 0; |
| | | return sameDN && sameCSN; |
| | | } |
| | | |
| | | private void assignNewDraftCNAndStore(ECLUpdateMsg change) |
| | |
| | | change.getDraftChangeNumber(), |
| | | previousCookie.toString(), |
| | | change.getBaseDN(), |
| | | change.getUpdateMsg().getChangeNumber()); |
| | | change.getUpdateMsg().getCSN()); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Refresh the eligibleCN by requesting the replication server. |
| | | * Refresh the eligibleCSN by requesting the replication server. |
| | | */ |
| | | public void refreshEligibleCN() |
| | | public void refreshEligibleCSN() |
| | | { |
| | | eligibleCN = replicationServer.getEligibleCN(); |
| | | eligibleCSN = replicationServer.getEligibleCSN(); |
| | | } |
| | | |
| | | } |