| | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | |
| | | /** |
| | | * Specifies the last changer number requested. |
| | | */ |
| | | private int lastChangeNumber = 0; |
| | | private long lastChangeNumber = 0; |
| | | /** |
| | | * Specifies whether the change number db has been read until its end. |
| | | */ |
| | |
| | | * @throws DirectoryException |
| | | * When an error is raised. |
| | | */ |
| | | private void initializeCLSearchFromChangeNumber(int startChangeNumber) |
| | | private void initializeCLSearchFromChangeNumber(long startChangeNumber) |
| | | throws DirectoryException |
| | | { |
| | | try |
| | |
| | | catch(DirectoryException de) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, de); |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | throw de; |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | throw new DirectoryException( |
| | | ResultCode.OPERATIONS_ERROR, |
| | | Message.raw(Category.SYNC, |
| | |
| | | * @throws DirectoryException |
| | | * if a database problem occurred |
| | | */ |
| | | private String findCookie(final int startChangeNumber) |
| | | throws ChangelogException, |
| | | DirectoryException |
| | | private String findCookie(final long startChangeNumber) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexDB cnIndexDB = |
| | | replicationServer.getChangeNumberIndexDB(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | final long firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | final String crossDomainStartState = |
| | | cnIndexDB.getPreviousCookie(firstChangeNumber); |
| | | final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData(); |
| | | final long firstChangeNumber = firstCNData.getChangeNumber(); |
| | | final String crossDomainStartState = firstCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | // Request filter DOES contain a startChangeNumber |
| | | |
| | | // Read the draftCNDb to see whether it contains startChangeNumber |
| | | String crossDomainStartState = |
| | | cnIndexDB.getPreviousCookie(startChangeNumber); |
| | | if (crossDomainStartState != null) |
| | | CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber); |
| | | if (startCNData != null) |
| | | { |
| | | // found the provided startChangeNumber, let's return it |
| | | final String crossDomainStartState = startCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | // the DB, let's use the lower limit. |
| | | if (startChangeNumber < firstChangeNumber) |
| | | { |
| | | crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber); |
| | | if (crossDomainStartState != null) |
| | | CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber); |
| | | if (firstCNData != null) |
| | | { |
| | | final String crossDomainStartState = firstCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | final long lastKey = cnIndexDB.getLastChangeNumber(); |
| | | crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey); |
| | | final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData(); |
| | | final long lastKey = lastCNData.getChangeNumber(); |
| | | final String crossDomainStartState = lastCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); |
| | | return crossDomainStartState; |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + " shutdown()"); |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | if (!domainCtxt.unRegisterHandler()) { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | |
| | | domainCtxts = null; |
| | | } |
| | | |
| | | private void releaseIterator() |
| | | private void releaseCursor() |
| | | { |
| | | if (this.cnIndexDBCursor != null) |
| | | { |
| | |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getCSN()); |
| | | |
| | | if (oldestContext.currentState.cover(oldestContext.stopState)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | | if (draftCompat |
| | | && lastChangeNumber > 0 |
| | | && change.getChangeNumber() > lastChangeNumber) |
| | | if (oldestContext.currentState.cover(oldestContext.stopState) |
| | | || (draftCompat |
| | | && lastChangeNumber > 0 |
| | | && change.getChangeNumber() > lastChangeNumber)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | |
| | | if (searchPhase == PERSISTENT_PHASE) |
| | | { |
| | | if (debugEnabled()) |
| | | clDomCtxtsToString("In getNextECLUpdate (persistent): " + |
| | | "looking for the generalized oldest change"); |
| | | TRACER.debugInfo(clDomCtxtsToString( |
| | | "In getNextECLUpdate (persistent): " |
| | | + "looking for the generalized oldest change")); |
| | | |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.getNextEligibleMessageForDomain(operationId); |
| | |
| | | |
| | | if (draftCompat) |
| | | { |
| | | assignNewDraftCNAndStore(change); |
| | | assignNewChangeNumberAndStore(change); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | |
| | | |
| | | if (oldestChange != null) |
| | | { |
| | | final CSN csn = oldestChange.getUpdateMsg().getCSN(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" |
| | | + oldestChange.getUpdateMsg().getCSN()); |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn); |
| | | |
| | | // Update the current state |
| | | previousCookie.update( |
| | | oldestChange.getBaseDN(), |
| | | oldestChange.getUpdateMsg().getCSN()); |
| | | previousCookie.update(oldestChange.getBaseDN(), csn); |
| | | |
| | | // Set the current value of global state in the returned message |
| | | oldestChange.setCookie(previousCookie); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate returns result oldest change =" + |
| | | oldestChange); |
| | | TRACER.debugInfo("getNextECLUpdate returns result oldestChange=" |
| | | + oldestChange); |
| | | |
| | | } |
| | | return oldestChange; |
| | |
| | | if (isEndOfCNIndexDBReached) |
| | | { |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | assignNewDraftCNAndStore(oldestChange); |
| | | assignNewChangeNumberAndStore(oldestChange); |
| | | return true; |
| | | } |
| | | |
| | | |
| | | // the next change from the CNIndexDB |
| | | CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN(); |
| | | String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN(); |
| | | final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData(); |
| | | final CSN csnFromDraftCNDb = cnIndexData.getCSN(); |
| | | final String dnFromDraftCNDb = cnIndexData.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber() |
| | | + " assigning changeNumber=" + cnIndexData.getChangeNumber() |
| | | + " to change=" + oldestChange); |
| | | |
| | | oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber()); |
| | | oldestChange.setChangeNumber(cnIndexData.getChangeNumber()); |
| | | return true; |
| | | } |
| | | |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number has" |
| | | + "skipped to changeNumber=" + cnIndexDBCursor.getChangeNumber() |
| | | + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?" |
| | | + "skipped to changeNumber=" + cnIndexData.getChangeNumber() |
| | | + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?" |
| | | + isEndOfCNIndexDBReached); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | return sameDN && sameCSN; |
| | | } |
| | | |
| | | private void assignNewDraftCNAndStore(ECLUpdateMsg change) |
| | | private void assignNewChangeNumberAndStore(ECLUpdateMsg change) |
| | | throws DirectoryException, ChangelogException |
| | | { |
| | | // generate a new change number and assign to this change |
| | |
| | | |
| | | // store in CNIndexDB the pair |
| | | // (change number of the current change, state before this change) |
| | | replicationServer.getChangeNumberIndexDB().add( |
| | | replicationServer.getChangeNumberIndexDB().add(new CNIndexData( |
| | | change.getChangeNumber(), |
| | | previousCookie.toString(), |
| | | change.getBaseDN(), |
| | | change.getUpdateMsg().getCSN()); |
| | | change.getUpdateMsg().getCSN())); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | // End of INIT_PHASE => always release the iterator |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | } |
| | | |
| | | /** |