| | |
| | | import java.util.*; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | } |
| | | } |
| | | |
| | | private String clDomCtxtsToString(String msg) |
| | | private String domaimCtxtsToString(String msg) |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | buffer.append(msg).append("\n"); |
| | |
| | | |
| | | /** |
| | | * Wait receiving the StartSessionMsg from the remote DS and process it. |
| | | * |
| | | * @return the startSessionMsg received |
| | | * @throws DirectoryException |
| | | * @throws IOException |
| | | * @throws ClassNotFoundException |
| | | * @throws DataFormatException |
| | | * @throws NotSupportedOldVersionPDUException |
| | | * @throws Exception |
| | | */ |
| | | private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer() |
| | | throws DirectoryException, IOException, ClassNotFoundException, |
| | | DataFormatException, |
| | | NotSupportedOldVersionPDUException |
| | | throws Exception |
| | | { |
| | | ReplicationMsg msg = session.receive(); |
| | | |
| | |
| | | } |
| | | else if (!(msg instanceof StartECLSessionMsg)) |
| | | { |
| | | Message message = Message |
| | | .raw("Protocol error: StartECLSessionMsg required." + msg |
| | | + " received."); |
| | | Message message = Message.raw( |
| | | "Protocol error: StartECLSessionMsg required." + msg + " received."); |
| | | abortStart(message); |
| | | return null; |
| | | } |
| | |
| | | * @param startChangeNumber |
| | | * the start change number coming from the request filter. |
| | | * @return the cookie corresponding to the passed in startChangeNumber. |
| | | * @throws Exception |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | * @throws DirectoryException |
| | | * if a database problem occurred |
| | |
| | | private void initializeChangelogDomainCtxts(String providedCookie, |
| | | boolean allowUnknownDomains) throws DirectoryException |
| | | { |
| | | /* |
| | | This map is initialized from the providedCookie. |
| | | Below, it will be traversed and each domain configured with ECL will be |
| | | checked and removed from the map. |
| | | At the end, normally the map should be empty. |
| | | Depending on allowUnknownDomains provided flag, a non empty map will |
| | | be considered as an error when allowUnknownDomains is false. |
| | | */ |
| | | Map<DN, ServerState> startStatesFromProvidedCookie = |
| | | new HashMap<DN, ServerState>(); |
| | | |
| | | ReplicationServer rs = this.replicationServer; |
| | | |
| | | // Parse the provided cookie and overwrite startState from it. |
| | | if ((providedCookie != null) && (providedCookie.length()!=0)) |
| | | startStatesFromProvidedCookie = |
| | | MultiDomainServerState.splitGenStateToServerStates(providedCookie); |
| | | |
| | | try |
| | | { |
| | | // Creates the table that will contain the real-time info for each |
| | | // and every domain. |
| | | final Set<DomainContext> tmpSet = new HashSet<DomainContext>(); |
| | | final StringBuilder missingDomains = new StringBuilder(); |
| | | for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator())) |
| | | { |
| | | // skip the 'unreal' changelog domain |
| | | if (domain == this.replicationServerDomain) |
| | | continue; |
| | | |
| | | // skip the excluded domains |
| | | if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString())) |
| | | { |
| | | // this is an excluded domain |
| | | if (allowUnknownDomains) |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | continue; |
| | | } |
| | | |
| | | // skip unused domains |
| | | final ServerState latestServerState = domain.getLatestServerState(); |
| | | if (latestServerState.isEmpty()) |
| | | continue; |
| | | |
| | | // Creates the new domain context |
| | | final DomainContext newDomainCtxt = new DomainContext(); |
| | | newDomainCtxt.active = true; |
| | | newDomainCtxt.rsDomain = domain; |
| | | newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate(); |
| | | |
| | | // Assign the start state for the domain |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = latestServerState; |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | } |
| | | else |
| | | { |
| | | // let's take the start state for this domain from the provided |
| | | // cookie |
| | | newDomainCtxt.startState = |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | |
| | | if (providedCookie == null |
| | | || providedCookie.length() == 0 |
| | | || allowUnknownDomains) |
| | | { |
| | | // when there is no cookie provided in the request, |
| | | // let's start traversing this domain from the beginning of |
| | | // what we have in the replication changelog |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | CSN latestTrimCSN = |
| | | new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0); |
| | | newDomainCtxt.startState = |
| | | domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // when there is a cookie provided in the request, |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | missingDomains.append(domain.getBaseDN()).append(":;"); |
| | | continue; |
| | | } |
| | | else if (!newDomainCtxt.startState.isEmpty()) |
| | | { |
| | | if (hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState)) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | newDomainCtxt.rsDomain.getBaseDN().toNormalizedString())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | newDomainCtxt.stopState = latestServerState; |
| | | } |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer); |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | mh.setBaseDNAndDomain(domain.getBaseDN(), false); |
| | | // register the unconnected into the domain |
| | | domain.registerHandler(mh); |
| | | newDomainCtxt.mh = mh; |
| | | |
| | | previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(), |
| | | newDomainCtxt.startState); |
| | | |
| | | // store the new context |
| | | tmpSet.add(newDomainCtxt); |
| | | } |
| | | |
| | | if (missingDomains.length()>0) |
| | | { |
| | | // If there are domain missing in the provided cookie, |
| | | // the request is rejected and a full resync is required. |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | missingDomains, |
| | | "<" + providedCookie + missingDomains + ">")); |
| | | } |
| | | |
| | | domainCtxts = tmpSet; |
| | | |
| | | /* |
| | | When it is valid to have the provided cookie containing unknown domains |
| | | (allowUnknownDomains is true), 2 cases must be considered : |
| | | - if the cookie contains a domain that is replicated but where |
| | | ECL is disabled, then this case is considered above |
| | | - if the cookie contains a domain that is even not replicated |
| | | then this case need to be considered here in another loop. |
| | | */ |
| | | if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains) |
| | | { |
| | | for (DN providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | | } |
| | | |
| | | // Now do the final checking |
| | | if (!startStatesFromProvidedCookie.isEmpty()) |
| | | { |
| | | /* |
| | | After reading all the known domains from the provided cookie, there |
| | | is one (or several) domain that are not currently configured. |
| | | This domain has probably been removed or replication disabled on it. |
| | | The request is rejected and full resync is required. |
| | | */ |
| | | StringBuilder sb = new StringBuilder(); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | sb.append(domainCtxt.rsDomain.getBaseDN()).append(":") |
| | | .append(domainCtxt.startState).append(";"); |
| | | } |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | startStatesFromProvidedCookie.toString() ,sb.toString())); |
| | | } |
| | | |
| | | // the next record from the CNIndexDB should be the one |
| | | domainCtxts = buildDomainContexts(providedCookie, allowUnknownDomains); |
| | | startCookie = providedCookie; |
| | | |
| | | // Initializes each and every domain with the next(first) eligible message |
| | |
| | | e); |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | " initializeCLDomCtxts ends with " + " " + dumpState()); |
| | | TRACER.debugInfo("initializeChangelogDomainCtxts() ends with " |
| | | + dumpState()); |
| | | } |
| | | |
| | | private Set<DomainContext> buildDomainContexts(String providedCookie, |
| | | boolean allowUnknownDomains) throws DirectoryException |
| | | { |
| | | final Set<DomainContext> results = new HashSet<DomainContext>(); |
| | | final ReplicationServer rs = this.replicationServer; |
| | | |
| | | /* |
| | | This map is initialized from the providedCookie. |
| | | Below, it will be traversed and each domain configured with ECL will be |
| | | checked and removed from the map. |
| | | At the end, normally the map should be empty. |
| | | Depending on allowUnknownDomains provided flag, a non empty map will |
| | | be considered as an error when allowUnknownDomains is false. |
| | | */ |
| | | final Map<DN, ServerState> startStatesFromProvidedCookie = |
| | | MultiDomainServerState.splitGenStateToServerStates(providedCookie); |
| | | |
| | | final StringBuilder missingDomains = new StringBuilder(); |
| | | for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator())) |
| | | { |
| | | // skip the 'unreal' changelog domain |
| | | if (domain == this.replicationServerDomain) |
| | | continue; |
| | | |
| | | // skip the excluded domains |
| | | if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString())) |
| | | { |
| | | // this is an excluded domain |
| | | if (allowUnknownDomains) |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | continue; |
| | | } |
| | | |
| | | // skip unused domains |
| | | final ServerState latestServerState = domain.getLatestServerState(); |
| | | if (latestServerState.isEmpty()) |
| | | continue; |
| | | |
| | | |
| | | // Creates the new domain context |
| | | final DomainContext newDomainCtxt = new DomainContext(); |
| | | newDomainCtxt.active = true; |
| | | newDomainCtxt.rsDomain = domain; |
| | | newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate(); |
| | | |
| | | // Assign the start state for the domain |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = latestServerState; |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | } |
| | | else |
| | | { |
| | | // let's take the start state for this domain from the provided |
| | | // cookie |
| | | newDomainCtxt.startState = |
| | | startStatesFromProvidedCookie.remove(domain.getBaseDN()); |
| | | |
| | | if (providedCookie == null || providedCookie.length() == 0 |
| | | || allowUnknownDomains) |
| | | { |
| | | // when there is no cookie provided in the request, |
| | | // let's start traversing this domain from the beginning of |
| | | // what we have in the replication changelog |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | CSN latestTrimCSN = |
| | | new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0); |
| | | newDomainCtxt.startState = |
| | | domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // when there is a cookie provided in the request, |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | missingDomains.append(domain.getBaseDN()).append(":;"); |
| | | continue; |
| | | } |
| | | else if (!newDomainCtxt.startState.isEmpty() |
| | | && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState)) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | newDomainCtxt.rsDomain.getBaseDN().toNormalizedString())); |
| | | } |
| | | } |
| | | |
| | | newDomainCtxt.stopState = latestServerState; |
| | | } |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer); |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | mh.setBaseDNAndDomain(domain.getBaseDN(), false); |
| | | // register the unconnected into the domain |
| | | domain.registerHandler(mh); |
| | | newDomainCtxt.mh = mh; |
| | | |
| | | previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(), |
| | | newDomainCtxt.startState); |
| | | |
| | | results.add(newDomainCtxt); |
| | | } |
| | | |
| | | if (missingDomains.length()>0) |
| | | { |
| | | // If there are domain missing in the provided cookie, |
| | | // the request is rejected and a full resync is required. |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | missingDomains, |
| | | "<" + providedCookie + missingDomains + ">")); |
| | | } |
| | | |
| | | /* |
| | | When it is valid to have the provided cookie containing unknown domains |
| | | (allowUnknownDomains is true), 2 cases must be considered : |
| | | - if the cookie contains a domain that is replicated but where |
| | | ECL is disabled, then this case is considered above |
| | | - if the cookie contains a domain that is even not replicated |
| | | then this case need to be considered here in another loop. |
| | | */ |
| | | if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains) |
| | | { |
| | | for (DN providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | | } |
| | | |
| | | // Now do the final checking |
| | | if (!startStatesFromProvidedCookie.isEmpty()) |
| | | { |
| | | /* |
| | | After reading all the known domains from the provided cookie, there |
| | | is one (or several) domain that are not currently configured. |
| | | This domain has probably been removed or replication disabled on it. |
| | | The request is rejected and full resync is required. |
| | | */ |
| | | StringBuilder sb = new StringBuilder(); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | sb.append(domainCtxt.rsDomain.getBaseDN()).append(":") |
| | | .append(domainCtxt.startState).append(";"); |
| | | } |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | startStatesFromProvidedCookie.toString() ,sb.toString())); |
| | | } |
| | | |
| | | return results; |
| | | } |
| | | |
| | | private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain, |
| | |
| | | try |
| | | { |
| | | // Disable timeout for next communications |
| | | // FIXME: why? and where is it reset? |
| | | session.setSoTimeout(0); |
| | | } |
| | | catch(Exception e) { /* do nothing */ } |
| | |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | // create reader |
| | | reader = new ServerReader(session, this); |
| | | reader.start(); |
| | | |
| | | if (writer == null) |
| | | { |
| | | // create writer |
| | | writer = new ECLServerWriter(session,this,replicationServerDomain); |
| | | writer.start(); |
| | | } |
| | | |
| | | // Resume the writer |
| | | ((ECLServerWriter)writer).resumeWriter(); |
| | | |
| | | // TODO:ECL Potential race condition if writer not yet resumed here |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId |
| | | + " initialized: " + " " + dumpState() + " " + " " |
| | | + clDomCtxtsToString("")); |
| | | + domaimCtxtsToString("")); |
| | | } |
| | | |
| | | private void initializeChangelogSearch(StartECLSessionMsg msg) |
| | |
| | | @Override |
| | | protected UpdateMsg getNextMessage(boolean synchronous) |
| | | { |
| | | UpdateMsg msg = null; |
| | | try |
| | | { |
| | | ECLUpdateMsg eclMsg = getNextECLUpdate(); |
| | | if (eclMsg!=null) |
| | | msg = eclMsg.getUpdateMsg(); |
| | | if (eclMsg != null) |
| | | return eclMsg.getUpdateMsg(); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, de); |
| | | } |
| | | return msg; |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | |
| | | while (continueLooping && searchPhase == INIT_PHASE) |
| | | { |
| | | // Step 1 & 2 |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | final DomainContext oldestContext = findDomainCtxtWithOldestChange(); |
| | | if (oldestContext == null) |
| | | { // there is no oldest change to process |
| | | closeInitPhase(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | // Build the ECLUpdateMsg to be returned |
| | | final ECLUpdateMsg change = newECLUpdateMsg(oldestContext); |
| | | |
| | | // Default is not to loop, with one exception |
| | |
| | | } |
| | | if (oldestContext.active) |
| | | { |
| | | // populates the table with the next eligible msg from iDom |
| | | // in non blocking mode, return null when no more eligible msg |
| | | oldestContext.computeNextEligibleMessageForDomain(operationId); |
| | | } |
| | | oldestChange = change; |
| | |
| | | if (searchPhase == PERSISTENT_PHASE) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(clDomCtxtsToString( |
| | | TRACER.debugInfo(domaimCtxtsToString( |
| | | "In getNextECLUpdate (persistent): " |
| | | + "looking for the generalized oldest change")); |
| | | |
| | |
| | | domainCtxt.computeNextEligibleMessageForDomain(operationId); |
| | | } |
| | | |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | final DomainContext oldestContext = findDomainCtxtWithOldestChange(); |
| | | if (oldestContext != null) |
| | | { |
| | | final ECLUpdateMsg change = newECLUpdateMsg(oldestContext); |
| | | oldestContext.currentState.update(change.getUpdateMsg().getCSN()); |
| | | |
| | | if (draftCompat) |
| | | { |
| | | assignNewChangeNumberAndStore(change); |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn); |
| | | |
| | | // Update the current state |
| | | previousCookie.update(oldestChange.getBaseDN(), csn); |
| | | |
| | | // Set the current value of global state in the returned message |
| | | oldestChange.setCookie(previousCookie); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate returns result oldestChange=" |
| | | + oldestChange); |
| | | |
| | | } |
| | | return oldestChange; |
| | | } |
| | |
| | | return true; |
| | | } |
| | | |
| | | |
| | | // the next change from the CNIndexDB |
| | | final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord(); |
| | | final CSN csnFromCNIndexDB = currentRecord.getCSN(); |
| | | final DN dnFromCNIndexDB = currentRecord.getBaseDN(); |
| | |
| | | searchPhase = UNDEFINED_PHASE; |
| | | } |
| | | |
| | | // End of INIT_PHASE => always release the iterator |
| | | // End of INIT_PHASE => always release the cursor |
| | | releaseCursor(); |
| | | } |
| | | |
| | |
| | | * @return the domainCtxt of the domain with the oldest change, null when |
| | | * none. |
| | | */ |
| | | private DomainContext findOldestChangeFromDomainCtxts() |
| | | private DomainContext findDomainCtxtWithOldestChange() |
| | | { |
| | | DomainContext oldestCtxt = null; |
| | | for (DomainContext domainCtxt : domainCtxts) |
| | | { |
| | | if (domainCtxt.active |
| | | // .msg is null when the previous (non blocking) nextMessage did |
| | | // .nextMsg is null when the previous (non blocking) nextMessage did |
| | | // not have any eligible msg to return |
| | | && domainCtxt.nextMsg != null |
| | | && (oldestCtxt == null |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In cn=changelog," + this |
| | | + " getOldestChangeFromDomainCtxts() returns " |
| | | + " findDomainCtxtWithOldestChange() returns " |
| | | + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1")); |
| | | |
| | | return oldestCtxt; |