| | |
| | | /** |
| | | * Specifies the last draft changer number (seqnum) requested. |
| | | */ |
| | | public int lastDraftCN = 0; |
| | | private int lastDraftCN = 0; |
| | | /** |
| | | * Specifies whether the draft change number (seqnum) db has been read until |
| | | * its end. |
| | | */ |
| | | public boolean isEndOfDraftCNReached = false; |
| | | private boolean isEndOfDraftCNReached = false; |
| | | /** |
| | | * Specifies whether the current search has been requested to be persistent |
| | | * or not. |
| | | */ |
| | | public short isPersistent; |
| | | private short isPersistent; |
| | | /** |
| | | * Specifies the current search phase : INIT or PERSISTENT. |
| | | */ |
| | | public int searchPhase = INIT_PHASE; |
| | | private int searchPhase = INIT_PHASE; |
| | | /** |
| | | * Specifies the cookie contained in the request, specifying where |
| | | * to start serving the ECL. |
| | | */ |
| | | public String startCookie; |
| | | private String startCookie; |
| | | /** |
| | | * Specifies the value of the cookie before the change currently processed |
| | | * is returned. It is updated with the change number of the change |
| | | * currently processed (thus becoming the "current" cookie just |
| | | * before the change is returned. |
| | | */ |
| | | public MultiDomainServerState previousCookie = |
| | | private MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(); |
| | | /** |
| | | * Specifies the excluded DNs (like cn=admin, ...). |
| | | */ |
| | | public Set<String> excludedServiceIDs = new HashSet<String>(); |
| | | private Set<String> excludedBaseDNs = new HashSet<String>(); |
| | | |
| | | /** |
| | | * Eligible changeNumber - only changes older or equal to eligibleCN |
| | | * are published in the ECL. |
| | | */ |
| | | public ChangeNumber eligibleCN = null; |
| | | private ChangeNumber eligibleCN = null; |
| | | |
| | | /** |
| | | * Provides a string representation of this object. |
| | |
| | | */ |
| | | private class DomainContext |
| | | { |
| | | ReplicationServerDomain rsd; |
| | | private ReplicationServerDomain rsd; |
| | | |
| | | boolean active; // active when there are still changes |
| | | // supposed eligible for the ECL. |
| | | /** |
| | | * active when there are still changes supposed eligible for the ECL. |
| | | */ |
| | | private boolean active; |
| | | |
| | | MessageHandler mh; // the message handler from which are read |
| | | // the changes for this domain |
| | | /** |
| | | * the message handler from which are reading the changes for this domain. |
| | | */ |
| | | private MessageHandler mh; |
| | | private UpdateMsg nextMsg; |
| | | private UpdateMsg nextNonEligibleMsg; |
| | | ServerState startState; |
| | | ServerState currentState; |
| | | ServerState stopState; |
| | | long domainLatestTrimDate; |
| | | private ServerState startState; |
| | | private ServerState currentState; |
| | | private ServerState stopState; |
| | | private long domainLatestTrimDate; |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | */ |
| | | public void toString(StringBuilder buffer) |
| | | { |
| | | buffer.append("[ [active=").append(active).append("] [rsd=") |
| | | .append(rsd).append("] [nextMsg=").append(nextMsg).append("(") |
| | | buffer.append("[ [active=").append(active) |
| | | .append("] [rsd=").append(rsd) |
| | | .append("] [nextMsg=").append(nextMsg).append("(") |
| | | .append(nextMsg != null ? |
| | | new Date(nextMsg.getChangeNumber().getTime()).toString():"") |
| | | .append(")") |
| | | .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg) |
| | | .append("] [startState=").append(startState).append("] [stopState=") |
| | | .append(stopState).append("] [currentState=").append(currentState) |
| | | .append("] [startState=").append(startState) |
| | | .append("] [stopState=").append(stopState) |
| | | .append("] [currentState=").append(currentState) |
| | | .append("]]"); |
| | | } |
| | | |
| | |
| | | private void getNextEligibleMessageForDomain(String opid) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "ctxt=" + toString()); |
| | | |
| | |
| | | <= eligibleCN.getTime()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + " stored nonEligibleMsg " + nextNonEligibleMsg |
| | | + " has now become eligible regarding " |
| | |
| | | (newMsg.getChangeNumber().getTime() < domainLatestTrimDate)); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() + |
| | | " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + " got new message : " |
| | | + " serviceId=[" + mh.getServiceId() |
| | | + " serviceId=[" + mh.getBaseDN() |
| | | + "] [newMsg=" + newMsg + "]" + dumpState()); |
| | | |
| | | // in non blocking mode, return null when no more msg |
| | |
| | | <= eligibleCN.getTime()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() |
| | | TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() |
| | | + " getNextEligibleMessageForDomain(" + opid+ ") " |
| | | + "newMsg isEligible=" + isEligible + " since " |
| | | + "newMsg=[" + newMsg.getChangeNumber() |
| | |
| | | } |
| | | } |
| | | |
| | | // The global list of contexts by domain for the search currently processed. |
| | | DomainContext[] domainCtxts = new DomainContext[0]; |
| | | /** |
| | | * The global list of contexts by domain for the search currently processed. |
| | | */ |
| | | private DomainContext[] domainCtxts = new DomainContext[0]; |
| | | |
| | | private String clDomCtxtsToString(String msg) |
| | | { |
| | |
| | | return buffer.toString(); |
| | | } |
| | | |
| | | static int UNDEFINED_PHASE = 0; |
| | | static int INIT_PHASE = 1; |
| | | static int PERSISTENT_PHASE = 2; |
| | | private static int UNDEFINED_PHASE = 0; |
| | | private static int INIT_PHASE = 1; |
| | | private static int PERSISTENT_PHASE = 2; |
| | | |
| | | /** |
| | | * Starts this handler based on a start message received from remote server. |
| | |
| | | { |
| | | // Peer DS uses protocol < V4 : send it a ReplServerStartMsg |
| | | startMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, getServiceId(), maxRcvWindow, |
| | | replicationServerURL, getBaseDN(), maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, getLocalGroupId(), |
| | | replicationServerDomain.getReplicationServer() |
| | |
| | | { |
| | | // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg |
| | | startMsg = new ReplServerStartDSMsg(replicationServerId, |
| | | replicationServerURL, getServiceId(), maxRcvWindow, |
| | | replicationServerURL, getBaseDN(), maxRcvWindow, |
| | | new ServerState(), localGenerationId, sslEncryption, |
| | | getLocalGroupId(), 0, replicationServer.getWeight(), 0); |
| | | } |
| | |
| | | replicationServer, rcvWindowSize); |
| | | try |
| | | { |
| | | setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | | setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // no chance to have a bad domain set here |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | replicationServer, 0); |
| | | try |
| | | { |
| | | setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | | setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | // the operation) in order to have the first and possible last |
| | | // DraftCN. |
| | | int[] limits = replicationServer.getECLDraftCNLimits( |
| | | eligibleCN, excludedServiceIDs); |
| | | eligibleCN, excludedBaseDNs); |
| | | |
| | | // If the startDraftCN provided is lower than the first Draft CN in |
| | | // the DB, let's use the lower limit. |
| | |
| | | continue; |
| | | |
| | | // skip the excluded domains |
| | | if (excludedServiceIDs.contains(rsd.getBaseDn())) |
| | | if (excludedBaseDNs.contains(rsd.getBaseDn())) |
| | | { |
| | | // this is an excluded domain |
| | | if (allowUnknownDomains) |
| | |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, |
| | | replicationServerURL, replicationServerId, replicationServer); |
| | | // set initial state |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | // set serviceID and domain |
| | | mh.setServiceIdAndDomain(rsd.getBaseDn(), false); |
| | | mh.setBaseDNAndDomain(rsd.getBaseDn(), false); |
| | | // register the unconnected into the domain |
| | | rsd.registerHandler(mh); |
| | | newDomainCtxt.mh = mh; |
| | |
| | | String localString; |
| | | localString = "External changelog Server "; |
| | | if (this.serverId != 0) |
| | | localString += serverId + " " + serverURL + " " + getServiceId() |
| | | localString += serverId + " " + serverURL + " " + getBaseDN() |
| | | + " " + this.getOperationId(); |
| | | else |
| | | localString += this.getClass().getCanonicalName()+ " " + operationId; |
| | |
| | | ERR_INVALID_COOKIE_SYNTAX.get()); |
| | | } |
| | | |
| | | excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs(); |
| | | replicationServer.disableEligibility(excludedServiceIDs); |
| | | excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); |
| | | replicationServer.disableEligibility(excludedBaseDNs); |
| | | eligibleCN = replicationServer.getEligibleCN(); |
| | | |
| | | if (startECLSessionMsg.getECLRequestType()== |
| | |
| | | |
| | | // the next change from the DraftCN db |
| | | ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber(); |
| | | String dnFromDraftCNDb = draftCNDbIter.getServiceID(); |
| | | String dnFromDraftCNDb = draftCNDbIter.getBaseDN(); |
| | | |
| | | // are replogcn and DraftCNcn should be the same change ? |
| | | int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb); |
| | |
| | | draftCNDb.add( |
| | | oldestChange.getDraftChangeNumber(), |
| | | previousCookie.toString(), |
| | | oldestChange.getServiceId(), |
| | | oldestChange.getBaseDN(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | break; |
| | |
| | | |
| | | // Update the current state |
| | | previousCookie.update( |
| | | oldestChange.getServiceId(), |
| | | oldestChange.getBaseDN(), |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | // Set the current value of global state in the returned message |
| | |
| | | writer = new ECLServerWriter(session,this,replicationServerDomain); |
| | | writer.start(); // start suspended |
| | | } |
| | | |
| | | } |
| | | else |
| | | { |