| | |
| | | */ |
| | | private class ScanSearchListener implements InternalSearchListener |
| | | { |
| | | private final ChangeNumber startingChangeNumber; |
| | | private final ChangeNumber endChangeNumber; |
| | | private final CSN startCSN; |
| | | private final CSN endCSN; |
| | | |
| | | public ScanSearchListener( |
| | | ChangeNumber startingChangeNumber, |
| | | ChangeNumber endChangeNumber) |
| | | public ScanSearchListener(CSN startCSN, CSN endCSN) |
| | | { |
| | | this.startingChangeNumber = startingChangeNumber; |
| | | this.endChangeNumber = endChangeNumber; |
| | | this.startCSN = startCSN; |
| | | this.endCSN = endCSN; |
| | | } |
| | | |
| | | @Override |
| | |
| | | InternalSearchOperation searchOperation, SearchResultEntry searchEntry) |
| | | throws DirectoryException |
| | | { |
| | | // Build the list of Operations that happened on this entry |
| | | // after startingChangeNumber and before endChangeNumber and |
| | | // add them to the replayOperations list |
| | | // Build the list of Operations that happened on this entry after startCSN |
| | | // and before endCSN and add them to the replayOperations list |
| | | Iterable<FakeOperation> updates = |
| | | EntryHistorical.generateFakeOperations(searchEntry); |
| | | |
| | | for (FakeOperation op : updates) |
| | | { |
| | | ChangeNumber cn = op.getChangeNumber(); |
| | | if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber))) |
| | | CSN csn = op.getCSN(); |
| | | if (csn.newer(startCSN) && csn.older(endCSN)) |
| | | { |
| | | synchronized (replayOperations) |
| | | { |
| | | replayOperations.put(cn, op); |
| | | replayOperations.put(csn, op); |
| | | } |
| | | } |
| | | } |
| | |
| | | private volatile long generationId = -1; |
| | | private volatile boolean generationIdSavedStatus = false; |
| | | |
| | | private final ChangeNumberGenerator generator; |
| | | private final CSNGenerator generator; |
| | | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | |
| | | * This list is used to temporary store operations that needs to be replayed |
| | | * at session establishment time. |
| | | */ |
| | | private final SortedMap<ChangeNumber, FakeOperation> replayOperations = |
| | | new TreeMap<ChangeNumber, FakeOperation>(); |
| | | private final SortedMap<CSN, FakeOperation> replayOperations = |
| | | new TreeMap<CSN, FakeOperation>(); |
| | | |
| | | /** |
| | | * The isolation policy that this domain is going to use. |
| | |
| | | |
| | | /** |
| | | * This configuration boolean indicates if this ReplicationDomain should log |
| | | * ChangeNumbers. |
| | | * CSNs. |
| | | */ |
| | | private boolean logChangeNumber = false; |
| | | private boolean logCSN = false; |
| | | |
| | | /** |
| | | * This configuration integer indicates the time the domain keeps the |
| | | * historical information necessary to solve conflicts.<br> |
| | | * When a change stored in the historical part of the user entry has a date |
| | | * (from its replication ChangeNumber) older than this delay, it is candidate |
| | | * to be purged. |
| | | * (from its replication CSN) older than this delay, it is candidate to be |
| | | * purged. |
| | | */ |
| | | private long histPurgeDelayInMilliSec = 0; |
| | | |
| | | /** |
| | | * The last change number purged in this domain. Allows to have a continuous |
| | | * purging process from one purge processing (task run) to the next one. |
| | | * Values 0 when the server starts. |
| | | * The last CSN purged in this domain. Allows to have a continuous purging |
| | | * process from one purge processing (task run) to the next one. Values 0 when |
| | | * the server starts. |
| | | */ |
| | | private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0); |
| | | private CSN lastCSNPurgedFromHist = new CSN(0,0,0); |
| | | |
| | | /** |
| | | * The thread that periodically saves the ServerState of this |
| | |
| | | */ |
| | | private class RSUpdater extends DirectoryThread |
| | | { |
| | | private final ChangeNumber startChangeNumber; |
| | | private final CSN startCSN; |
| | | |
| | | |
| | | |
| | | protected RSUpdater(ChangeNumber replServerMaxChangeNumber) |
| | | protected RSUpdater(CSN replServerMaxCSN) |
| | | { |
| | | super("Replica DS(" + serverId |
| | | + ") missing change publisher for domain \"" |
| | | + baseDn.toString() + "\""); |
| | | this.startChangeNumber = replServerMaxChangeNumber; |
| | | this.startCSN = replServerMaxCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | try |
| | | { |
| | | if (buildAndPublishMissingChanges(startChangeNumber, broker)) |
| | | if (buildAndPublishMissingChanges(startCSN, broker)) |
| | | { |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | |
| | | |
| | | this.isolationPolicy = configuration.getIsolationPolicy(); |
| | | this.configDn = configuration.dn(); |
| | | this.logChangeNumber = configuration.isLogChangenumber(); |
| | | this.logCSN = configuration.isLogChangenumber(); |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | this.histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | |
| | | |
| | | /* |
| | | * Create a new Persistent Server State that will be used to store |
| | | * the last ChangeNumber seen from all LDAP servers in the topology. |
| | | * the last CSN seen from all LDAP servers in the topology. |
| | | */ |
| | | state = new PersistentServerState(baseDn, serverId, getServerState()); |
| | | |
| | | flushThread = new ServerStateFlush(); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on this replication domain. |
| | | * CSNGenerator is used to create new unique CSNs for each operation done on |
| | | * this replication domain. |
| | | * |
| | | * The generator time is adjusted to the time of the last CN received from |
| | | * The generator time is adjusted to the time of the last CSN received from |
| | | * remote other servers. |
| | | */ |
| | | generator = getGenerator(); |
| | |
| | | { |
| | | // There is no replication context attached to the operation |
| | | // so this is not a replication operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(deleteOperation); |
| | | CSN csn = generateCSN(deleteOperation); |
| | | String modifiedEntryUUID = EntryHistorical.getEntryUUID(deletedEntry); |
| | | ctx = new DeleteContext(changeNumber, modifiedEntryUUID); |
| | | ctx = new DeleteContext(csn, modifiedEntryUUID); |
| | | deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | |
| | | synchronized (replayOperations) |
| | |
| | | replayOperations.remove(replayOperations.firstKey()); |
| | | } |
| | | replayOperations.put( |
| | | changeNumber, |
| | | csn, |
| | | new FakeDelOperation( |
| | | deleteOperation.getEntryDN().toString(), |
| | | changeNumber,modifiedEntryUUID )); |
| | | csn, modifiedEntryUUID)); |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | EntryHistorical hist = EntryHistorical.newInstanceFromEntry( |
| | | modifyDNOperation.getOriginalEntry()); |
| | | if (hist.addedOrRenamedAfter(ctx.getChangeNumber())) |
| | | if (hist.addedOrRenamedAfter(ctx.getCSN())) |
| | | { |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.NO_OPERATION, null); |
| | |
| | | { |
| | | // There is no replication context attached to the operation |
| | | // so this is not a replication operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation); |
| | | CSN csn = generateCSN(modifyDNOperation); |
| | | String newParentId = null; |
| | | if (modifyDNOperation.getNewSuperior() != null) |
| | | { |
| | |
| | | |
| | | Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); |
| | | String modifiedEntryUUID = EntryHistorical.getEntryUUID(modifiedEntry); |
| | | ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId); |
| | | ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId); |
| | | modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | | return new SynchronizationProviderResult.ContinueProcessing(); |
| | |
| | | public SynchronizationProviderResult handleConflictResolution( |
| | | PreOperationModifyOperation modifyOperation) |
| | | { |
| | | if ((!modifyOperation.isSynchronizationOperation()) |
| | | && (!brokerIsConnected())) |
| | | if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected()) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | |
| | | if (ctx == null) |
| | | { |
| | | // No replication ctx attached => not a replicated operation |
| | | // - create a ctx with : changeNumber, entryUUID |
| | | // - create a ctx with : CSN, entryUUID |
| | | // - attach the context to the op |
| | | |
| | | ChangeNumber changeNumber = generateChangeNumber(modifyOperation); |
| | | CSN csn = generateCSN(modifyOperation); |
| | | String modifiedEntryUUID = EntryHistorical.getEntryUUID(modifiedEntry); |
| | | ctx = new ModifyContext(changeNumber, modifiedEntryUUID); |
| | | ctx = new ModifyContext(csn, modifiedEntryUUID); |
| | | |
| | | modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | |
| | | */ |
| | | public void doPreOperation(PreOperationAddOperation addOperation) |
| | | { |
| | | AddContext ctx = new AddContext(generateChangeNumber(addOperation), |
| | | AddContext ctx = new AddContext(generateCSN(addOperation), |
| | | EntryHistorical.getEntryUUID(addOperation), |
| | | findEntryUUID(addOperation.getEntryDN().getParentDNInSuffix())); |
| | | |
| | |
| | | ResultCode result = op.getResultCode(); |
| | | // Note that a failed non-replication operation might not have a change |
| | | // number. |
| | | ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op); |
| | | if ((curChangeNumber != null) && (logChangeNumber)) |
| | | CSN curCSN = OperationContext.getCSN(op); |
| | | if (curCSN != null && logCSN) |
| | | { |
| | | op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(), |
| | | "replicationCN", curChangeNumber)); |
| | | "replicationCSN", curCSN)); |
| | | } |
| | | |
| | | if (result == ResultCode.SUCCESS) |
| | |
| | | numReplayedPostOpCalled++; |
| | | try |
| | | { |
| | | remotePendingChanges.commit(curChangeNumber); |
| | | remotePendingChanges.commit(curCSN); |
| | | } |
| | | catch (NoSuchElementException e) |
| | | { |
| | | Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get( |
| | | op.toString(), curChangeNumber.toString()); |
| | | op.toString(), curCSN.toString()); |
| | | logError(message); |
| | | return; |
| | | } |
| | |
| | | * This is an operation type that we do not know about |
| | | * It should never happen. |
| | | */ |
| | | pendingChanges.remove(curChangeNumber); |
| | | pendingChanges.remove(curCSN); |
| | | Message message = |
| | | ERR_UNKNOWN_TYPE.get(op.getOperationType().toString()); |
| | | logError(message); |
| | |
| | | try |
| | | { |
| | | msg.encode(); |
| | | pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg); |
| | | pendingChanges.commitAndPushCommittedChanges(curCSN, msg); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // will be caught at publish time. |
| | |
| | | catch (NoSuchElementException e) |
| | | { |
| | | Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get( |
| | | op.toString(), curChangeNumber.toString()); |
| | | op.toString(), curCSN.toString()); |
| | | logError(message); |
| | | return; |
| | | } |
| | |
| | | { |
| | | // Remove an unsuccessful non-replication operation from the pending |
| | | // changes list. |
| | | if (curChangeNumber != null) |
| | | if (curCSN != null) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | pendingChanges.remove(curCSN); |
| | | pendingChanges.pushCommittedChanges(); |
| | | } |
| | | } |
| | |
| | | attrs, null); |
| | | |
| | | Entry entryToRename = null; |
| | | ChangeNumber entryToRenameCN = null; |
| | | CSN entryToRenameCSN = null; |
| | | for (SearchResultEntry entry : searchOp.getSearchEntries()) |
| | | { |
| | | EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry); |
| | | if (entryToRename == null) |
| | | { |
| | | entryToRename = entry; |
| | | entryToRenameCN = history.getDNDate(); |
| | | entryToRenameCSN = history.getDNDate(); |
| | | } |
| | | else if (!history.addedOrRenamedAfter(entryToRenameCN)) |
| | | else if (!history.addedOrRenamedAfter(entryToRenameCSN)) |
| | | { |
| | | // this conflict is older than the previous, keep it. |
| | | entryToRename = entry; |
| | | entryToRenameCN = history.getDNDate(); |
| | | entryToRenameCSN = history.getDNDate(); |
| | | } |
| | | } |
| | | |
| | |
| | | Operation op = null; |
| | | boolean replayDone = false; |
| | | boolean dependency = false; |
| | | ChangeNumber changeNumber = null; |
| | | CSN csn = null; |
| | | int retryCount = 10; |
| | | |
| | | // Try replay the operation, then flush (replaying) any pending operation |
| | |
| | | // are processed locally. |
| | | op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL)); |
| | | |
| | | changeNumber = OperationContext.getChangeNumber(op); |
| | | csn = OperationContext.getCSN(op); |
| | | op.run(); |
| | | |
| | | ResultCode result = op.getResultCode(); |
| | |
| | | // the update became a dummy update and the result |
| | | // of the conflict resolution phase is to do nothing. |
| | | // however we still need to push this change to the serverState |
| | | updateError(changeNumber); |
| | | updateError(csn); |
| | | } |
| | | else |
| | | { |
| | |
| | | logError(message); |
| | | numUnresolvedNamingConflicts.incrementAndGet(); |
| | | replayErrorMsg = message.toString(); |
| | | updateError(changeNumber); |
| | | updateError(csn); |
| | | } |
| | | } catch (ASN1Exception e) |
| | | { |
| | |
| | | replayErrorMsg = logDecodingOperationError(msg, e); |
| | | } catch (Exception e) |
| | | { |
| | | if (changeNumber != null) |
| | | if (csn != null) |
| | | { |
| | | /* |
| | | * An Exception happened during the replay process. |
| | |
| | | stackTraceToSingleLineString(e), op.toString()); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | updateError(changeNumber); |
| | | updateError(csn); |
| | | } else |
| | | { |
| | | replayErrorMsg = logDecodingOperationError(msg, e); |
| | |
| | | // Prepare restart of loop |
| | | replayDone = false; |
| | | dependency = false; |
| | | changeNumber = null; |
| | | csn = null; |
| | | retryCount = 10; |
| | | |
| | | } while (msg != null); |
| | |
| | | * It is necessary because the postOperation does not always get |
| | | * called when error or Exceptions happen during the operation replay. |
| | | * |
| | | * @param changeNumber the ChangeNumber of the operation with error. |
| | | * @param csn the CSN of the operation with error. |
| | | */ |
| | | public void updateError(ChangeNumber changeNumber) |
| | | public void updateError(CSN csn) |
| | | { |
| | | try |
| | | { |
| | | remotePendingChanges.commit(changeNumber); |
| | | remotePendingChanges.commit(csn); |
| | | } |
| | | catch (NoSuchElementException e) |
| | | { |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "LDAPReplicationDomain.updateError: Unable to find remote " |
| | | + "pending change for change number %s", |
| | | changeNumber); |
| | | "LDAPReplicationDomain.updateError: Unable to find remote " |
| | | + "pending change for CSN %s", csn); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Generate a new change number and insert it in the pending list. |
| | | * Generate a new CSN and insert it in the pending list. |
| | | * |
| | | * @param operation The operation for which the change number must be |
| | | * generated. |
| | | * @return The new change number. |
| | | * @param operation |
| | | * The operation for which the CSN must be generated. |
| | | * @return The new CSN. |
| | | */ |
| | | private ChangeNumber generateChangeNumber(PluginOperation operation) |
| | | private CSN generateCSN(PluginOperation operation) |
| | | { |
| | | return pendingChanges.putLocalOperation(operation); |
| | | } |
| | |
| | | // The other type of errors can not be caused by naming conflicts. |
| | | // Log a message for the repair tool. |
| | | Message message = ERR_ERROR_REPLAYING_OPERATION.get( |
| | | op.toString(), ctx.getChangeNumber().toString(), |
| | | op.toString(), ctx.getCSN().toString(), |
| | | result.toString(), op.getErrorMessage().toString()); |
| | | logError(message); |
| | | return true; |
| | |
| | | // The other type of errors can not be caused by naming conflicts. |
| | | // Log a message for the repair tool. |
| | | Message message = ERR_ERROR_REPLAYING_OPERATION.get( |
| | | op.toString(), ctx.getChangeNumber().toString(), |
| | | op.toString(), ctx.getCSN().toString(), |
| | | result.toString(), op.getErrorMessage().toString()); |
| | | logError(message); |
| | | return true; |
| | |
| | | // The other type of errors can not be caused by naming conflicts. |
| | | // Log a message for the repair tool. |
| | | Message message = ERR_ERROR_REPLAYING_OPERATION.get( |
| | | op.toString(), ctx.getChangeNumber().toString(), |
| | | op.toString(), ctx.getCSN().toString(), |
| | | result.toString(), op.getErrorMessage().toString()); |
| | | logError(message); |
| | | return true; |
| | |
| | | // The other type of errors can not be caused by naming conflicts. |
| | | // log a message for the repair tool. |
| | | Message message = ERR_ERROR_REPLAYING_OPERATION.get( |
| | | op.toString(), ctx.getChangeNumber().toString(), |
| | | op.toString(), ctx.getCSN().toString(), |
| | | result.toString(), op.getErrorMessage().toString()); |
| | | logError(message); |
| | | return true; |
| | |
| | | state.clearInMemory(); |
| | | state.loadState(); |
| | | |
| | | generator.adjust(state.getMaxChangeNumber(serverId)); |
| | | generator.adjust(state.getMaxCSN(serverId)); |
| | | // Retrieves the generation ID associated with the data imported |
| | | |
| | | generationId = loadGenerationId(); |
| | |
| | | { |
| | | String includeAttributeStrings[] = |
| | | {"objectclass", "sn", "cn", "entryuuid"}; |
| | | HashSet<AttributeType> includeAttributes; |
| | | includeAttributes = new HashSet<AttributeType>(); |
| | | Set<AttributeType> includeAttributes = new HashSet<AttributeType>(); |
| | | for (String attrName : includeAttributeStrings) |
| | | { |
| | | AttributeType attrType = DirectoryServer.getAttributeType(attrName); |
| | |
| | | |
| | | |
| | | /** |
| | | * Push the modifications contained in the given parameter as |
| | | * a modification that would happen on a local server. |
| | | * The modifications are not applied to the local database, |
| | | * historical information is not updated but a ChangeNumber |
| | | * is generated and the ServerState associated to this domain is |
| | | * updated. |
| | | * @param modifications The modification to push |
| | | * Push the modifications contained in the given parameter as a modification |
| | | * that would happen on a local server. The modifications are not applied to |
| | | * the local database, historical information is not updated but a CSN is |
| | | * generated and the ServerState associated to this domain is updated. |
| | | * |
| | | * @param modifications |
| | | * The modification to push |
| | | */ |
| | | public void synchronizeModifications(List<Modification> modifications) |
| | | { |
| | |
| | | modifications); |
| | | LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op); |
| | | |
| | | ChangeNumber cn = generateChangeNumber(localOp); |
| | | OperationContext ctx = new ModifyContext(cn, "schema"); |
| | | CSN csn = generateCSN(localOp); |
| | | OperationContext ctx = new ModifyContext(csn, "schema"); |
| | | localOp.setAttachment(SYNCHROCONTEXT, ctx); |
| | | localOp.setResultCode(ResultCode.SUCCESS); |
| | | synchronize(localOp); |
| | |
| | | ReplicationDomainCfg configuration) |
| | | { |
| | | isolationPolicy = configuration.getIsolationPolicy(); |
| | | logChangeNumber = configuration.isLogChangenumber(); |
| | | logCSN = configuration.isLogChangenumber(); |
| | | histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | DN eclConfigEntryDN = DN.decode( |
| | | "cn=external changeLog," + configDn); |
| | | |
| | | DN eclConfigEntryDN = DN.decode("cn=external changeLog," + configDn); |
| | | if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) |
| | | { |
| | | DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null); |
| | |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | replicationServerState.getChangeNumber(serverId); |
| | | CSN replServerMaxCSN = replicationServerState.getCSN(serverId); |
| | | |
| | | // we don't want to update from here (a DS) an empty RS because |
| | | // normally the RS should have been updated by other RSes except for |
| | |
| | | // ... hence the RS we are connected to should not be empty |
| | | // ... or if it is empty, it is due to a voluntary reset |
| | | // and we don't want to update it with our changes that could be huge. |
| | | if ((replServerMaxChangeNumber != null) && |
| | | (replServerMaxChangeNumber.getSeqnum()!=0)) |
| | | if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0) |
| | | { |
| | | ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverId); |
| | | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | CSN ourMaxCSN = state.getMaxCSN(serverId); |
| | | if (ourMaxCSN != null && !ourMaxCSN.olderOrEqual(replServerMaxCSN)) |
| | | { |
| | | pendingChanges.setRecovering(true); |
| | | broker.setRecoveryRequired(true); |
| | | new RSUpdater(replServerMaxChangeNumber).start(); |
| | | new RSUpdater(replServerMaxCSN).start(); |
| | | } |
| | | } |
| | | } catch (Exception e) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Build the list of changes that have been processed by this server |
| | | * after the ChangeNumber given as a parameter and publish them |
| | | * using the given session. |
| | | * Build the list of changes that have been processed by this server after the |
| | | * CSN given as a parameter and publish them using the given session. |
| | | * |
| | | * @param startingChangeNumber The ChangeNumber where we need to start the |
| | | * search |
| | | * @param session The session to use to publish the changes |
| | | * |
| | | * @return A boolean indicating he success of the |
| | | * operation. |
| | | * @throws Exception if an Exception happens during the search. |
| | | * @param startCSN |
| | | * The CSN where we need to start the search |
| | | * @param session |
| | | * The session to use to publish the changes |
| | | * @return A boolean indicating he success of the operation. |
| | | * @throws Exception |
| | | * if an Exception happens during the search. |
| | | */ |
| | | public boolean buildAndPublishMissingChanges( |
| | | ChangeNumber startingChangeNumber, |
| | | ReplicationBroker session) |
| | | throws Exception |
| | | public boolean buildAndPublishMissingChanges(CSN startCSN, |
| | | ReplicationBroker session) throws Exception |
| | | { |
| | | // Trim the changes in replayOperations that are older than |
| | | // the startingChangeNumber. |
| | | // Trim the changes in replayOperations that are older than the startCSN. |
| | | synchronized (replayOperations) |
| | | { |
| | | Iterator<ChangeNumber> it = replayOperations.keySet().iterator(); |
| | | Iterator<CSN> it = replayOperations.keySet().iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | if (it.next().olderOrEqual(startingChangeNumber)) |
| | | if (it.next().olderOrEqual(startCSN)) |
| | | { |
| | | it.remove(); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | ChangeNumber lastRetrievedChange; |
| | | CSN lastRetrievedChange; |
| | | InternalSearchOperation op; |
| | | ChangeNumber currentStartChangeNumber = startingChangeNumber; |
| | | CSN currentStartCSN = startCSN; |
| | | do |
| | | { |
| | | lastRetrievedChange = null; |
| | |
| | | // So we search by interval of 10 seconds |
| | | // and store the results in the replayOperations list |
| | | // so that they are sorted before sending them. |
| | | long missingChangesDelta = currentStartChangeNumber.getTime() + 10000; |
| | | ChangeNumber endChangeNumber = |
| | | new ChangeNumber( |
| | | missingChangesDelta, 0xffffffff, serverId); |
| | | long missingChangesDelta = currentStartCSN.getTime() + 10000; |
| | | CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, serverId); |
| | | |
| | | ScanSearchListener listener = |
| | | new ScanSearchListener(currentStartChangeNumber, endChangeNumber); |
| | | op = searchForChangedEntries( |
| | | baseDn, currentStartChangeNumber, endChangeNumber, listener); |
| | | new ScanSearchListener(currentStartCSN, endCSN); |
| | | op = searchForChangedEntries(baseDn, currentStartCSN, endCSN, listener); |
| | | |
| | | // Publish and remove all the changes from the replayOperations list |
| | | // that are older than the endChangeNumber. |
| | | // that are older than the endCSN. |
| | | List<FakeOperation> opsToSend = new LinkedList<FakeOperation>(); |
| | | synchronized (replayOperations) |
| | | { |
| | |
| | | while (itOp.hasNext()) |
| | | { |
| | | FakeOperation fakeOp = itOp.next(); |
| | | if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber)) |
| | | && state.cover(fakeOp.getChangeNumber())) |
| | | if ((fakeOp.getCSN().olderOrEqual(endCSN)) |
| | | && state.cover(fakeOp.getCSN())) |
| | | { |
| | | lastRetrievedChange = fakeOp.getChangeNumber(); |
| | | lastRetrievedChange = fakeOp.getCSN(); |
| | | opsToSend.add(fakeOp); |
| | | itOp.remove(); |
| | | } |
| | |
| | | opsToSend.clear(); |
| | | if (lastRetrievedChange != null) |
| | | { |
| | | currentStartChangeNumber = lastRetrievedChange; |
| | | currentStartCSN = lastRetrievedChange; |
| | | } |
| | | else |
| | | { |
| | | currentStartChangeNumber = endChangeNumber; |
| | | currentStartCSN = endCSN; |
| | | } |
| | | |
| | | } while (pendingChanges.recoveryUntil(lastRetrievedChange) && |
| | |
| | | |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. The only changes that will |
| | | * be send will be the one generated on the serverId provided in |
| | | * fromChangeNumber. |
| | | * @param baseDn the base DN |
| | | * @param fromChangeNumber The ChangeNumber from which we want the changes |
| | | * @param lastChangeNumber The max ChangeNumber that the search should return |
| | | * @param resultListener The listener that will process the entries returned |
| | | * Search for the changes that happened since fromCSN based on the historical |
| | | * attribute. The only changes that will be send will be the one generated on |
| | | * the serverId provided in fromCSN. |
| | | * |
| | | * @param baseDn |
| | | * the base DN |
| | | * @param fromCSN |
| | | * The CSN from which we want the changes |
| | | * @param lastCSN |
| | | * The max CSN that the search should return |
| | | * @param resultListener |
| | | * The listener that will process the entries returned |
| | | * @return the internal search operation |
| | | * @throws Exception when raised. |
| | | * @throws Exception |
| | | * when raised. |
| | | */ |
| | | public static InternalSearchOperation searchForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | ChangeNumber lastChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | public static InternalSearchOperation searchForChangedEntries(DN baseDn, |
| | | CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener) |
| | | throws Exception |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | Integer serverId = fromChangeNumber.getServerId(); |
| | | Integer serverId = fromCSN.getServerId(); |
| | | |
| | | String maxValueForId; |
| | | if (lastChangeNumber == null) |
| | | if (lastCSN == null) |
| | | { |
| | | maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId) |
| | | + "ffffffff"; |
| | | } |
| | | else |
| | | { |
| | | maxValueForId = lastChangeNumber.toString(); |
| | | maxValueForId = lastCSN.toString(); |
| | | } |
| | | |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "(&(" + EntryHistorical.HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" |
| | | + fromChangeNumber + ")(" + EntryHistorical.HISTORICAL_ATTRIBUTE_NAME + |
| | | "<=dummy:" + maxValueForId + "))"); |
| | | "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" + |
| | | "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))"); |
| | | |
| | | Set<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME); |
| | | attrs.add(EntryHistorical.ENTRYUUID_ATTRIBUTE_NAME); |
| | | Set<String> attrs = new LinkedHashSet<String>(3); |
| | | attrs.add(HISTORICAL_ATTRIBUTE_NAME); |
| | | attrs.add(ENTRYUUID_ATTRIBUTE_NAME); |
| | | attrs.add("*"); |
| | | return conn.processSearch( |
| | | ByteString.valueOf(baseDn.toString()), |
| | |
| | | } |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. The only changes that will |
| | | * be send will be the one generated on the serverId provided in |
| | | * fromChangeNumber. |
| | | * @param baseDn the base DN |
| | | * @param fromChangeNumber The change number from which we want the changes |
| | | * @param resultListener that will process the entries returned. |
| | | * Search for the changes that happened since fromCSN based on the historical |
| | | * attribute. The only changes that will be send will be the one generated on |
| | | * the serverId provided in fromCSN. |
| | | * |
| | | * @param baseDn |
| | | * the base DN |
| | | * @param fromCSN |
| | | * The CSN from which we want the changes |
| | | * @param resultListener |
| | | * that will process the entries returned. |
| | | * @return the internal search operation |
| | | * @throws Exception when raised. |
| | | * @throws Exception |
| | | * when raised. |
| | | */ |
| | | public static InternalSearchOperation searchForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | public static InternalSearchOperation searchForChangedEntries(DN baseDn, |
| | | CSN fromCSN, InternalSearchListener resultListener) throws Exception |
| | | { |
| | | return searchForChangedEntries( |
| | | baseDn, fromChangeNumber, null, resultListener); |
| | | return searchForChangedEntries(baseDn, fromCSN, null, resultListener); |
| | | } |
| | | |
| | | |
| | |
| | | public long countEntries() throws DirectoryException |
| | | { |
| | | Backend backend = retrievesBackend(baseDn); |
| | | |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | | Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( |
| | |
| | | TRACER.debugInfo("[PURGE] purgeConflictsHistorical " |
| | | + "on domain: " + baseDn |
| | | + "endDate:" + new Date(endDate) |
| | | + "lastChangeNumberPurgedFromHist: " |
| | | + lastChangeNumberPurgedFromHist.toStringUI()); |
| | | + "lastCSNPurgedFromHist: " |
| | | + lastCSNPurgedFromHist.toStringUI()); |
| | | |
| | | LDAPFilter filter = null; |
| | | try |
| | | { |
| | | filter = LDAPFilter.decode( |
| | | "(" + EntryHistorical.HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" |
| | | + lastChangeNumberPurgedFromHist + ")"); |
| | | + lastCSNPurgedFromHist + ")"); |
| | | |
| | | } catch (LDAPException e) |
| | | { |
| | | // Not possible. We know the filter just above is correct. |
| | | } |
| | | |
| | | Set<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME); |
| | | attrs.add(EntryHistorical.ENTRYUUID_ATTRIBUTE_NAME); |
| | | Set<String> attrs = new LinkedHashSet<String>(3); |
| | | attrs.add(HISTORICAL_ATTRIBUTE_NAME); |
| | | attrs.add(ENTRYUUID_ATTRIBUTE_NAME); |
| | | attrs.add("*"); |
| | | InternalSearchOperation searchOp = conn.processSearch( |
| | | ByteString.valueOf(baseDn.toString()), |
| | |
| | | int count = 0; |
| | | |
| | | if (task != null) |
| | | task.setProgressStats(lastChangeNumberPurgedFromHist, count); |
| | | task.setProgressStats(lastCSNPurgedFromHist, count); |
| | | |
| | | for (SearchResultEntry entry : searchOp.getSearchEntries()) |
| | | { |
| | |
| | | } |
| | | |
| | | EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); |
| | | lastChangeNumberPurgedFromHist = entryHist.getOldestCN(); |
| | | lastCSNPurgedFromHist = entryHist.getOldestCSN(); |
| | | entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec); |
| | | Attribute attr = entryHist.encodeAndPurge(); |
| | | count += entryHist.getLastPurgedValuesCount(); |
| | |
| | | } |
| | | else if (task != null) |
| | | { |
| | | task.setProgressStats(lastChangeNumberPurgedFromHist, count); |
| | | task.setProgressStats(lastCSNPurgedFromHist, count); |
| | | } |
| | | } |
| | | } |