| | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | |
| | | public final class ECLServerHandler extends ServerHandler |
| | | { |
| | | |
| | | private static int UNDEFINED_PHASE = 0; |
| | | /** TODO JNR. */ |
| | | public static int INIT_PHASE = 1; |
| | | private static int PERSISTENT_PHASE = 2; |
| | | |
| | | /** |
| | | * This is a string identifying the operation, provided by the client part of |
| | | * the ECL, used to help interpretation of messages logged. |
| | |
| | | private CSN eligibleCSN; |
| | | |
| | | /** |
| | | * The global list of contexts by domain for the search currently processed. |
| | | */ |
| | | private DomainContext[] domainCtxts = new DomainContext[0]; |
| | | |
| | | /** |
| | | * Provides a string representation of this object. |
| | | * @return the string representation. |
| | | */ |
| | |
| | | buffer.append("[ [active=").append(active) |
| | | .append("] [rsd=").append(rsd) |
| | | .append("] [nextMsg=").append(nextMsg).append("(") |
| | | .append(nextMsg != null ? |
| | | new Date(nextMsg.getCSN().getTime()).toString():"") |
| | | .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "") |
| | | .append(")") |
| | | .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg) |
| | | .append("] [startState=").append(startState) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the next message eligible regarding |
| | | * the crossDomain eligible CSN. Put it in the context table. |
| | | * @param opid The operation id. |
| | | * Computes the next message eligible regarding the crossDomain eligible |
| | | * CSN. |
| | | * |
| | | * @param opId The operation id. |
| | | */ |
| | | private void getNextEligibleMessageForDomain(String opid) |
| | | private void computeNextEligibleMessageForDomain(String opId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDNString() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "ctxt=" + toString()); |
| | | debugInfo(opId, "ctxt=" + this); |
| | | |
| | | assert(nextMsg == null); |
| | | try |
| | |
| | | // not eligible |
| | | if (nextNonEligibleMsg != null) |
| | | { |
| | | boolean hasBecomeEligible = |
| | | (nextNonEligibleMsg.getCSN().getTime() |
| | | <= eligibleCSN.getTime()); |
| | | final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " |
| | | + mh.getBaseDNString() |
| | | + " getNextEligibleMessageForDomain(" + opid + ") " |
| | | + " stored nonEligibleMsg " + nextNonEligibleMsg |
| | | + " has now become eligible regarding " |
| | | + " the eligibleCSN ("+ eligibleCSN |
| | | + " ):" + hasBecomeEligible); |
| | | debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg |
| | | + " has now become eligible regarding the eligibleCSN (" |
| | | + eligibleCSN + " ): " + hasBecomeEligible); |
| | | |
| | | if (hasBecomeEligible) |
| | | { |
| | | // it is now eligible |
| | | nextMsg = nextNonEligibleMsg; |
| | | nextNonEligibleMsg = null; |
| | | } |
| | |
| | | else |
| | | { |
| | | // Here comes a new message !!! |
| | | // non blocking |
| | | UpdateMsg newMsg; |
| | | do { |
| | | newMsg = mh.getNextMessage(false); |
| | | // when the replication changelog is trimmed, the last (latest) chg |
| | | // is left in the db (whatever its age), and we don't want this chg |
| | | // to be returned in the external changelog. |
| | | // So let's check if the chg time is older than the trim date |
| | | } while ((newMsg!=null) && |
| | | (newMsg.getCSN().getTime() < domainLatestTrimDate)); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " |
| | | + mh.getBaseDNString() |
| | | + " getNextEligibleMessageForDomain(" + opid + ") " |
| | | + " got new message : " |
| | | + " baseDN=[" + mh.getBaseDNString() |
| | | + "] [newMsg=" + newMsg + "]" + dumpState()); |
| | | |
| | | // in non blocking mode, return null when no more msg |
| | | if (newMsg != null) |
| | | final UpdateMsg newMsg = getNextMessage(); |
| | | if (newMsg == null) |
| | | { |
| | | boolean isEligible = (newMsg.getCSN().getTime() |
| | | <= eligibleCSN.getTime()); |
| | | return; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " |
| | | + mh.getBaseDNString() |
| | | + " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "newMsg isEligible=" + isEligible + " since " |
| | | + "newMsg=[" + newMsg.getCSN() |
| | | + " " + new Date(newMsg.getCSN().getTime()) |
| | | + "] eligibleCSN=[" + eligibleCSN |
| | | + " " + new Date(eligibleCSN.getTime())+"]" |
| | | debugInfo(opId, "got new message : [newMsg=" + newMsg + "] " |
| | | + dumpState()); |
| | | |
| | | if (isEligible) |
| | | { |
| | | nextMsg = newMsg; |
| | | } |
| | | else |
| | | { |
| | | nextNonEligibleMsg = newMsg; |
| | | } |
| | | final boolean isEligible = isEligible(newMsg); |
| | | |
| | | if (debugEnabled()) |
| | | debugInfo(opId, "newMsg isEligible=" + isEligible + " since " |
| | | + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=[" |
| | | + toString(eligibleCSN) + "] " + dumpState()); |
| | | |
| | | if (isEligible) |
| | | { |
| | | nextMsg = newMsg; |
| | | } |
| | | else |
| | | { |
| | | nextNonEligibleMsg = newMsg; |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isEligible(UpdateMsg msg) |
| | | { |
| | | return msg.getCSN().getTime() <= eligibleCSN.getTime(); |
| | | } |
| | | |
| | | private UpdateMsg getNextMessage() |
| | | { |
| | | while (true) |
| | | { |
| | | final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */); |
| | | |
| | | if (newMsg == null) |
| | | { // in non blocking mode, null means no more messages |
| | | return null; |
| | | } |
| | | else if (newMsg.getCSN().getTime() < domainLatestTrimDate) |
| | | { |
| | | // when the replication changelog is trimmed, the last (latest) chg |
| | | // is left in the db (whatever its age), and we don't want this chg |
| | | // to be returned in the external changelog. |
| | | // So let's check if the chg time is older than the trim date |
| | | return newMsg; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private String toString(CSN csn) |
| | | { |
| | | return csn + " " + asDate(csn); |
| | | } |
| | | |
| | | private void debugInfo(String opId, String message) |
| | | { |
| | | TRACER.debugInfo("In ECLServerHandler, for baseDN=" |
| | | + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId |
| | | + ") " + message); |
| | | } |
| | | |
| | | /** |
| | | * Unregister the handler from the DomainContext ReplicationDomain. |
| | | * @return Whether the handler has been unregistered with success. |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The global list of contexts by domain for the search currently processed. |
| | | */ |
| | | private DomainContext[] domainCtxts = new DomainContext[0]; |
| | | |
| | | private String clDomCtxtsToString(String msg) |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | |
| | | return buffer.toString(); |
| | | } |
| | | |
| | | private static int UNDEFINED_PHASE = 0; |
| | | private static int INIT_PHASE = 1; |
| | | private static int PERSISTENT_PHASE = 2; |
| | | |
| | | /** |
| | | * Starts this handler based on a start message received from remote server. |
| | | * @param inECLStartMsg The start msg provided by the remote server. |
| | |
| | | |
| | | /** |
| | | * Initialize the handler from a provided cookie value. |
| | | * @param crossDomainStartState The provided cookie value. |
| | | * @throws DirectoryException When an error is raised. |
| | | * |
| | | * @param providedCookie |
| | | * The provided cookie value. |
| | | * @throws DirectoryException |
| | | * When an error is raised. |
| | | */ |
| | | private void initializeCLSearchFromGenState(String crossDomainStartState) |
| | | private void initializeCLSearchFromCookie(String providedCookie) |
| | | throws DirectoryException |
| | | { |
| | | initializeChangelogDomainCtxts(crossDomainStartState, false); |
| | | this.draftCompat = false; |
| | | |
| | | initializeChangelogDomainCtxts(providedCookie, false); |
| | | } |
| | | |
| | | /** |
| | |
| | | // Creates the table that will contain the real-time info for each |
| | | // and every domain. |
| | | Set<DomainContext> tmpSet = new HashSet<DomainContext>(); |
| | | String missingDomains = ""; |
| | | for (Iterator<ReplicationServerDomain> iter = rs.getDomainIterator(); |
| | | iter.hasNext();) |
| | | final StringBuilder missingDomains = new StringBuilder(); |
| | | for (ReplicationServerDomain rsd : toIterable(rs.getDomainIterator())) |
| | | { |
| | | ReplicationServerDomain rsd = iter.next(); |
| | | |
| | | // skip the 'unreal' changelog domain |
| | | if (rsd == this.replicationServerDomain) |
| | | continue; |
| | |
| | | } |
| | | |
| | | // skip unused domains |
| | | if (rsd.getLatestServerState().isEmpty()) |
| | | final ServerState latestServerState = rsd.getLatestServerState(); |
| | | if (latestServerState.isEmpty()) |
| | | continue; |
| | | |
| | | // Creates the new domain context |
| | | DomainContext newDomainCtxt = new DomainContext(); |
| | | final DomainContext newDomainCtxt = new DomainContext(); |
| | | newDomainCtxt.active = true; |
| | | newDomainCtxt.rsd = rsd; |
| | | newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate(); |
| | |
| | | // Assign the start state for the domain |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = rsd.getLatestServerState().duplicate(); |
| | | newDomainCtxt.startState = latestServerState; |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDN()); |
| | | } |
| | | else |
| | |
| | | // when there is a cookie provided in the request, |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | missingDomains += (rsd.getBaseDN() + ":;"); |
| | | missingDomains.append(rsd.getBaseDN()).append(":;"); |
| | | continue; |
| | | } |
| | | else if (!newDomainCtxt.startState.isEmpty()) |
| | |
| | | } |
| | | } |
| | | |
| | | newDomainCtxt.stopState = rsd.getLatestServerState().duplicate(); |
| | | newDomainCtxt.stopState = latestServerState; |
| | | } |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | |
| | | - if the cookie contains a domain that is even not replicated |
| | | then this case need to be considered here in another loop. |
| | | */ |
| | | if (!startStatesFromProvidedCookie.isEmpty()) |
| | | if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains) |
| | | { |
| | | if (allowUnknownDomains) |
| | | for (DN providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | | for (DN providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | | } |
| | | |
| | | // Now do the final checking |
| | |
| | | // Initializes each and every domain with the next(first) eligible message |
| | | // from the domain. |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.getNextEligibleMessageForDomain(operationId); |
| | | domainCtxt.computeNextEligibleMessageForDomain(operationId); |
| | | |
| | | if (domainCtxt.nextMsg == null) |
| | | domainCtxt.active = false; |
| | |
| | | closeInitPhase(); |
| | | } |
| | | |
| | | /* TODO: From replication CSN |
| | | //-- |
| | | if (startCLMsg.getStartMode()==2) |
| | | { |
| | | if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN())) |
| | | return; |
| | | } |
| | | |
| | | //-- |
| | | if (startCLMsg.getStartMode()==4) |
| | | { |
| | | // to get the CL first and last |
| | | initializeCLDomCtxts(null); // from start |
| | | CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN(); |
| | | |
| | | try |
| | | { |
| | | // to get the CL first and last |
| | | // last rely on the crossDomainEligibleCSN thus must have been |
| | | // computed before |
| | | int[] limits = computeCLLimits(crossDomainEligibleCSN); |
| | | // Send the response |
| | | CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]); |
| | | session.publish(msg); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | try |
| | | { |
| | | session.publish( |
| | | new ErrorMsg( |
| | | replicationServer.getServerId(), |
| | | serverId, |
| | | Message.raw(Category.SYNC, Severity.INFORMATION, |
| | | "Exception raised: " + e.getMessage()))); |
| | | } |
| | | catch(IOException ioe) |
| | | { |
| | | // FIXME: close conn ? |
| | | } |
| | | } |
| | | return; |
| | | } |
| | | */ |
| | | |
| | | // Store into domain |
| | | registerIntoDomain(); |
| | | |
| | | if (debugEnabled()) |
| | |
| | | short requestType = msg.getECLRequestType(); |
| | | if (requestType == REQUEST_TYPE_FROM_COOKIE) |
| | | { |
| | | initializeCLSearchFromGenState(msg.getCrossDomainServerState()); |
| | | initializeCLSearchFromCookie(msg.getCrossDomainServerState()); |
| | | } |
| | | else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER) |
| | | { |
| | |
| | | } |
| | | |
| | | // Build the ECLUpdateMsg to be returned |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // cookie will be set later |
| | | oldestContext.rsd.getBaseDN(), |
| | | 0); // changeNumber may be set later |
| | | oldestContext.nextMsg = null; |
| | | final ECLUpdateMsg change = newECLUpdateMsg(oldestContext); |
| | | |
| | | // Default is not to loop, with one exception |
| | | continueLooping = false; |
| | |
| | | |
| | | // Set and test the domain of the oldestChange see if we reached |
| | | // the end of the phase for this domain |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getCSN()); |
| | | oldestContext.currentState.update(change.getUpdateMsg().getCSN()); |
| | | |
| | | if (oldestContext.currentState.cover(oldestContext.stopState) |
| | | || (draftCompat |
| | |
| | | { |
| | | // populates the table with the next eligible msg from iDom |
| | | // in non blocking mode, return null when no more eligible msg |
| | | oldestContext.getNextEligibleMessageForDomain(operationId); |
| | | oldestContext.computeNextEligibleMessageForDomain(operationId); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | |
| | | + "looking for the generalized oldest change")); |
| | | |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.getNextEligibleMessageForDomain(operationId); |
| | | domainCtxt.computeNextEligibleMessageForDomain(operationId); |
| | | } |
| | | |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | if (oldestContext != null) |
| | | { |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // set later |
| | | oldestContext.rsd.getBaseDN(), |
| | | 0); |
| | | oldestContext.nextMsg = null; // clean |
| | | |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getCSN()); |
| | | final ECLUpdateMsg change = newECLUpdateMsg(oldestContext); |
| | | oldestContext.currentState.update(change.getUpdateMsg().getCSN()); |
| | | |
| | | if (draftCompat) |
| | | { |
| | |
| | | return oldestChange; |
| | | } |
| | | |
| | | private ECLUpdateMsg newECLUpdateMsg(DomainContext ctx) |
| | | { |
| | | // cookie will be set later AND changeNumber may be set later |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) ctx.nextMsg, null, ctx.rsd.getBaseDN(), 0); |
| | | ctx.nextMsg = null; // clean after use |
| | | return change; |
| | | } |
| | | |
| | | /** |
| | | * Either retrieves a change number from the DB, or assign a new change number |
| | | * and store in the DB. |
| | |
| | | private boolean assignChangeNumber(final ECLUpdateMsg oldestChange) |
| | | throws ChangelogException |
| | | { |
| | | // We also need to check if the CNIndexDB is consistent with |
| | | // the changelogdb. |
| | | // if not, 2 potential reasons |
| | | // a/ : changelog has been purged (trim)let's traverse the CNIndexDB |
| | | // b/ : changelog is late .. let's traverse the changelogDb |
| | | // The following loop allows to loop until being on the same cn |
| | | // in the 2 dbs |
| | | // We also need to check if the CNIndexDB is consistent with the |
| | | // changelogDB. If not, 2 potential reasons: |
| | | // a/ changelog has been purged (trim) let's traverse the CNIndexDB |
| | | // b/ changelog is late ... let's traverse the changelogDb |
| | | // The following loop allows to loop until being on the same cn in the 2 dbs |
| | | |
| | | // replogCSN : the oldest change from the changelog db |
| | | CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN(); |
| | |
| | | final DN dnFromCNIndexDB = currentRecord.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " comparing the 2 db DNs :" + dnFromChangelogDb + "?=" |
| | | + csnFromChangelogDb + " timestamps:" |
| | | + new Date(csnFromChangelogDb.getTime()) + " ?older" |
| | | + new Date(csnFromCNIndexDB.getTime())); |
| | | |
| | | TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :" |
| | | + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:" |
| | | + asDate(csnFromChangelogDb) + " ?older" |
| | | + asDate(csnFromCNIndexDB)); |
| | | |
| | | if (areSameChange(csnFromChangelogDb, dnFromChangelogDb, |
| | | csnFromCNIndexDB, dnFromCNIndexDB)) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " assigning changeNumber=" + currentRecord.getChangeNumber() |
| | | + " to change=" + oldestChange); |
| | | TRACER.debugInfo("assignChangeNumber() assigning changeNumber=" |
| | | + currentRecord.getChangeNumber() + " to change=" + oldestChange); |
| | | |
| | | oldestChange.setChangeNumber(currentRecord.getChangeNumber()); |
| | | return true; |
| | |
| | | { |
| | | // the change from the changelogDb is older |
| | | // it should have been stored lately |
| | | // let's continue to traverse the changelogdb |
| | | // let's continue to traverse the changelogDB |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber(): will skip " |
| | | TRACER.debugInfo("assignChangeNumber() will skip " |
| | | + csnFromChangelogDb |
| | | + " and read next from the regular changelog."); |
| | | + " and read next change from the regular changelog."); |
| | | return false; // TO BE CHECKED |
| | | } |
| | | |
| | |
| | | try |
| | | { |
| | | // let's traverse the CNIndexDB searching for the change |
| | | // found in the changelogDb. |
| | | // found in the changelogDB |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " will skip " + csnFromCNIndexDB |
| | | TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB |
| | | + " and read next change from the CNIndexDB."); |
| | | |
| | | isEndOfCNIndexDBReached = !cnIndexDBCursor.next(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number has" |
| | | + "skipped to changeNumber=" + currentRecord.getChangeNumber() |
| | | + " csn=" + currentRecord.getCSN() + " End of CNIndexDB ?" |
| | | TRACER.debugInfo("assignChangeNumber() has skipped to changeNumber=" |
| | | + currentRecord.getChangeNumber() + " csn=" |
| | | + currentRecord.getCSN() + " End of CNIndexDB ?" |
| | | + isEndOfCNIndexDBReached); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | } |
| | | } |
| | | |
| | | private Date asDate(CSN csn) |
| | | { |
| | | return new Date(csn.getTime()); |
| | | } |
| | | |
| | | private boolean areSameChange(CSN csn1, DN dn1, CSN csn2, DN dn2) |
| | | { |
| | | boolean sameDN = dn1.compareTo(dn2) == 0; |