| | |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | | * peer replication server. |
| | |
| | | { |
| | | this.draftCompat = true; |
| | | |
| | | // Any possible optimization on draft CN in the request filter ? |
| | | final String providedCookie = findCookie(startDraftCN); |
| | | initializeChangelogDomainCtxts(providedCookie, true); |
| | | } |
| | |
| | | */ |
| | | public ECLUpdateMsg getNextECLUpdate() throws DirectoryException |
| | | { |
| | | ECLUpdateMsg oldestChange = null; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In cn=changelog" + this + |
| | | " getNextECLUpdate starts: " + dumpState()); |
| | | |
| | | ECLUpdateMsg oldestChange = null; |
| | | try |
| | | { |
| | | |
| | | // getMessage: |
| | | // get the oldest msg: |
| | | // after: |
| | |
| | | while (continueLooping && searchPhase == INIT_PHASE) |
| | | { |
| | | // Step 1 & 2 |
| | | if (searchPhase == INIT_PHASE) |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | if (oldestContext == null) |
| | | { // there is no oldest change to process |
| | | closeInitPhase(); |
| | | |
| | | // signals end of phase 1 to the caller |
| | | return null; |
| | | } |
| | | |
| | | // Build the ECLUpdateMsg to be returned |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // cookie will be set later |
| | | oldestContext.rsd.getBaseDn(), |
| | | 0); // draftChangeNumber may be set later |
| | | oldestContext.nextMsg = null; |
| | | |
| | | // Default is not to loop, with one exception |
| | | continueLooping = false; |
| | | if (draftCompat) |
| | | { |
| | | // Default is not to loop, with one exception |
| | | continueLooping = false; |
| | | continueLooping = !assignDraftCN(change); |
| | | } |
| | | |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | if (oldestContext == null) |
| | | { // there is no oldest change to process |
| | | closeInitPhase(); |
| | | // here we have the right oldest change |
| | | // and in the draft case, we have its draft changenumber |
| | | |
| | | // signals end of phase 1 to the caller |
| | | return null; |
| | | } |
| | | // Set and test the domain of the oldestChange see if we reached |
| | | // the end of the phase for this domain |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getChangeNumber()); |
| | | |
| | | // Build the ECLUpdateMsg to be returned |
| | | String suffix = oldestContext.rsd.getBaseDn(); |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // cookie will be set later |
| | | suffix, |
| | | 0); // draftChangeNumber may be set later |
| | | oldestContext.nextMsg = null; |
| | | |
| | | if (draftCompat) |
| | | { |
| | | // either retrieve a draftCN from the draftCNDb |
| | | // or assign a new draftCN and store in the db |
| | | |
| | | DraftCNDbHandler draftCNDb=replicationServer.getDraftCNDbHandler(); |
| | | |
| | | // We also need to check if the draftCNdb is consistent with |
| | | // the changelogdb. |
| | | // if not, 2 potential reasons |
| | | // a/ : changelog has been purged (trim)let's traverse the draftCNDb |
| | | // b/ : changelog is late .. let's traverse the changelogDb |
| | | // The following loop allows to loop until being on the same cn |
| | | // in the 2 dbs |
| | | |
| | | // replogcn : the oldest change from the changelog db |
| | | ChangeNumber cnFromChangelogDb = |
| | | oldestChange.getUpdateMsg().getChangeNumber(); |
| | | String dnFromChangelogDb = suffix; |
| | | |
| | | while (true) |
| | | { |
| | | if (!isEndOfDraftCNReached) |
| | | { |
| | | // we did not reach yet the end of the DraftCNdb |
| | | |
| | | // the next change from the DraftCN db |
| | | ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber(); |
| | | String dnFromDraftCNDb = draftCNDbIter.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " comparing the 2 db DNs :" |
| | | + dnFromChangelogDb + "?=" + cnFromChangelogDb |
| | | + " timestamps:" + new Date(cnFromChangelogDb.getTime()) |
| | | + " ?older" + new Date(cnFromDraftCNDb.getTime())); |
| | | |
| | | // should replogcn and DraftCN be the same change ? |
| | | if (areSameChange(cnFromChangelogDb, dnFromChangelogDb, |
| | | cnFromDraftCNDb, dnFromDraftCNDb)) |
| | | { |
| | | // same domain and same CN => same change |
| | | |
| | | // assign the DraftCN found to the change from the changelogdb |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " assigning draftCN=" + draftCNDbIter.getDraftCN() |
| | | + " to change=" + oldestChange); |
| | | |
| | | oldestChange.setDraftChangeNumber(draftCNDbIter.getDraftCN()); |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // replogcn and DraftCNcn are NOT on the same change |
| | | if (cnFromDraftCNDb.older(cnFromChangelogDb)) |
| | | { |
| | | // the change from the DraftCNDb is older |
| | | // that means that the change has been purged from the |
| | | // changelogDb (and DraftCNdb not yet been trimmed) |
| | | |
| | | try |
| | | { |
| | | // let's traverse the DraftCNdb searching for the change |
| | | // found in the changelogDb. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " will skip " + cnFromDraftCNDb |
| | | + " and read next change from the DraftCNDb."); |
| | | |
| | | isEndOfDraftCNReached = !draftCNDbIter.next(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " has skiped to " |
| | | + " sn=" + draftCNDbIter.getDraftCN() |
| | | + " cn=" + draftCNDbIter.getChangeNumber() |
| | | + " End of draftCNDb ?"+isEndOfDraftCNReached); |
| | | |
| | | if (isEndOfDraftCNReached) |
| | | { |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | storeNewChange(draftCNDb, oldestChange, oldestChange |
| | | .getBaseDN()); |
| | | break; |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // TODO: At least log a warning |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // the change from the changelogDb is older |
| | | // it should have been stored lately |
| | | // let's continue to traverse the changelogdb |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate: will skip " |
| | | + cnFromChangelogDb |
| | | + " and read next from the regular changelog."); |
| | | continueLooping = true; |
| | | break; // TO BE CHECKED |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | storeNewChange(draftCNDb, oldestChange, suffix); |
| | | break; |
| | | } |
| | | } // while DraftCN |
| | | } // if draftCompat |
| | | |
| | | // here we have the right oldest change |
| | | // and in the draft case, we have its draft changenumber |
| | | |
| | | // Set and test the domain of the oldestChange see if we reached |
| | | // the end of the phase for this domain |
| | | oldestContext.currentState.update( |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | if (oldestContext.currentState.cover(oldestContext.stopState)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | | if (draftCompat && (lastDraftCN>0) && |
| | | (oldestChange.getDraftChangeNumber()>lastDraftCN)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | | if (oldestContext.active) |
| | | { |
| | | // populates the table with the next eligible msg from iDom |
| | | // in non blocking mode, return null when no more eligible msg |
| | | oldestContext.getNextEligibleMessageForDomain(operationId); |
| | | } |
| | | } // phase == INIT_PHASE |
| | | } // while (...) |
| | | if (oldestContext.currentState.cover(oldestContext.stopState)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | | if (draftCompat && (lastDraftCN>0) && |
| | | (change.getDraftChangeNumber()>lastDraftCN)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | | if (oldestContext.active) |
| | | { |
| | | // populates the table with the next eligible msg from iDom |
| | | // in non blocking mode, return null when no more eligible msg |
| | | oldestContext.getNextEligibleMessageForDomain(operationId); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | | |
| | | if (searchPhase == PERSISTENT_PHASE) |
| | | { |
| | |
| | | DomainContext oldestContext = findOldestChangeFromDomainCtxts(); |
| | | if (oldestContext != null) |
| | | { |
| | | String suffix = oldestContext.rsd.getBaseDn(); |
| | | |
| | | oldestChange = new ECLUpdateMsg( |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // set later |
| | | suffix, 0); |
| | | oldestContext.rsd.getBaseDn(), |
| | | 0); |
| | | oldestContext.nextMsg = null; // clean |
| | | |
| | | oldestContext.currentState.update( |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | change.getUpdateMsg().getChangeNumber()); |
| | | |
| | | if (draftCompat) |
| | | { |
| | | // should generate DraftCN |
| | | DraftCNDbHandler draftCNDb =replicationServer.getDraftCNDbHandler(); |
| | | storeNewChange(draftCNDb, oldestChange, suffix); |
| | | assignNewDraftCNAndStore(change); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | | } |
| | | } |
| | |
| | | return oldestChange; |
| | | } |
| | | |
| | | /** |
| | | * Either retrieves a draftCN from the draftCNDb, or assign a new draftCN and |
| | | * store in the db. |
| | | * |
| | | * @param oldestChange |
| | | * the oldestChange where to assign the draftCN |
| | | * @return <code>true</code> if a draftCN has been assigned to the provided |
| | | * oldestChange, <code>false</code> otherwise |
| | | * @throws DirectoryException |
| | | * if any problem occur |
| | | */ |
| | | private boolean assignDraftCN(final ECLUpdateMsg oldestChange) |
| | | throws DirectoryException |
| | | { |
| | | // We also need to check if the draftCNdb is consistent with |
| | | // the changelogdb. |
| | | // if not, 2 potential reasons |
| | | // a/ : changelog has been purged (trim)let's traverse the draftCNDb |
| | | // b/ : changelog is late .. let's traverse the changelogDb |
| | | // The following loop allows to loop until being on the same cn |
| | | // in the 2 dbs |
| | | |
| | | // replogcn : the oldest change from the changelog db |
| | | ChangeNumber cnFromChangelogDb = |
| | | oldestChange.getUpdateMsg().getChangeNumber(); |
| | | String dnFromChangelogDb = oldestChange.getBaseDN(); |
| | | |
| | | while (true) |
| | | { |
| | | if (isEndOfDraftCNReached) |
| | | { |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | assignNewDraftCNAndStore(oldestChange); |
| | | return true; |
| | | } |
| | | |
| | | |
| | | // the next change from the DraftCN db |
| | | ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber(); |
| | | String dnFromDraftCNDb = draftCNDbIter.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " comparing the 2 db DNs :" + dnFromChangelogDb + "?=" |
| | | + cnFromChangelogDb + " timestamps:" |
| | | + new Date(cnFromChangelogDb.getTime()) + " ?older" |
| | | + new Date(cnFromDraftCNDb.getTime())); |
| | | |
| | | |
| | | if (areSameChange(cnFromChangelogDb, dnFromChangelogDb, |
| | | cnFromDraftCNDb, dnFromDraftCNDb)) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " assigning draftCN=" + draftCNDbIter.getDraftCN() |
| | | + " to change=" + oldestChange); |
| | | |
| | | oldestChange.setDraftChangeNumber(draftCNDbIter.getDraftCN()); |
| | | return true; |
| | | } |
| | | |
| | | |
| | | if (!cnFromDraftCNDb.older(cnFromChangelogDb)) |
| | | { |
| | | // the change from the changelogDb is older |
| | | // it should have been stored lately |
| | | // let's continue to traverse the changelogdb |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate: will skip " + cnFromChangelogDb |
| | | + " and read next from the regular changelog."); |
| | | return false; // TO BE CHECKED |
| | | } |
| | | |
| | | |
| | | // the change from the DraftCNDb is older |
| | | // that means that the change has been purged from the |
| | | // changelogDb (and DraftCNdb not yet been trimmed) |
| | | try |
| | | { |
| | | // let's traverse the DraftCNdb searching for the change |
| | | // found in the changelogDb. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " will skip " + cnFromDraftCNDb |
| | | + " and read next change from the DraftCNDb."); |
| | | |
| | | isEndOfDraftCNReached = !draftCNDbIter.next(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate generating draftCN " |
| | | + " has skipped to " + " sn=" + draftCNDbIter.getDraftCN() |
| | | + " cn=" + draftCNDbIter.getChangeNumber() |
| | | + " End of draftCNDb ?" + isEndOfDraftCNReached); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | // FIXME There is an opportunity for an infinite loop here if the DB |
| | | // continuously throws DatabaseExceptions |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean areSameChange(ChangeNumber cn1, String dn1, ChangeNumber cn2, |
| | | String dn2) |
| | | { |
| | |
| | | return sameDN && sameCN; |
| | | } |
| | | |
| | | private void storeNewChange(DraftCNDbHandler draftCNDb, ECLUpdateMsg change, |
| | | String suffix) |
| | | private void assignNewDraftCNAndStore(ECLUpdateMsg change) |
| | | throws DirectoryException |
| | | { |
| | | // generate a new draftCN and assign to this change |
| | | change.setDraftChangeNumber(replicationServer.getNewDraftCN()); |
| | | |
| | | // store in DraftCNdb the pair |
| | | // (DraftCN of the current change, state before this change) |
| | | DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler(); |
| | | draftCNDb.add( |
| | | change.getDraftChangeNumber(), |
| | | previousCookie.toString(), |
| | | suffix, |
| | | change.getBaseDN(), |
| | | change.getUpdateMsg().getChangeNumber()); |
| | | } |
| | | |