| | |
| | | */ |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.util.TimeThread.getTime; |
| | | import static org.opends.server.synchronization.SynchMessages.*; |
| | | import static org.opends.server.loggers.Error.*; |
| | | import static org.opends.server.messages.MessageHandler.*; |
| | | import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT; |
| | | import static org.opends.server.synchronization.Historical.*; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.api.ConfigurableComponent; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | import org.opends.server.config.StringConfigAttribute; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyDNOperation; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.Operation; |
| | | import org.opends.server.messages.MessageHandler; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPException; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.RDN; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchResultEntry; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.opends.server.types.SynchronizationProviderResult; |
| | | |
| | | /** |
| | |
| | | private ServerState state; |
| | | private int numReplayedPostOpCalled = 0; |
| | | |
| | | private boolean assuredFlag = false; |
| | | |
| | | |
| | | private int maxReceiveQueue = 0; |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | |
| | | |
| | | private DN configDn; |
| | | |
| | | private InternalClientConnection conn = new InternalClientConnection(); |
| | | |
| | | static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static String BASE_DN_ATTR = "ds-cfg-synchronization-dn"; |
| | | static String SERVER_ID_ATTR = "ds-cfg-directory-server-id"; |
| | |
| | | broker.restartReceive(); |
| | | for (int i=0; i<listenerThreadNumber; i++) |
| | | { |
| | | ListenerThread myThread = new ListenerThread(this, |
| | | changeNumberGenerator); |
| | | ListenerThread myThread = new ListenerThread(this); |
| | | myThread.start(); |
| | | synchroThreads.add(myThread); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Implement the handleConflictResolution phase of the deleteOperation. |
| | | * |
| | | * @param deleteOperation The deleteOperation. |
| | | * @return A SynchronizationProviderResult indicating if the operation |
| | | * can continue. |
| | | */ |
| | | public SynchronizationProviderResult handleConflictResolution( |
| | | DeleteOperation deleteOperation) |
| | | { |
| | | DeleteContext ctx = |
| | | (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); |
| | | Entry deletedEntry = deleteOperation.getEntryToDelete(); |
| | | |
| | | if (ctx != null) |
| | | { |
| | | /* |
| | | * This is a synchronization operation |
| | | * Check that the modified entry has the same entryuuid |
| | | * has was in the original message. |
| | | */ |
| | | String operationEntryUUID = ctx.getEntryUid(); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); |
| | | if (!operationEntryUUID.equals(modifiedEntryUUID)) |
| | | { |
| | | /* |
| | | * The changes entry is not the same entry as the one on |
| | | * the original change was performed. |
| | | * Probably the original entry was renamed and replaced with |
| | | * another entry. |
| | | * We must not let the change proceed, return a negative |
| | | * result and set the result code to NO_SUCH_OBJET. |
| | | * When the operation will return, the thread that started the |
| | | * operation will try to find the correct entry and restart a new |
| | | * operation. |
| | | */ |
| | | deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // There is no Synchronization context attached to the operation |
| | | // so this is not a synchronization operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(deleteOperation); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); |
| | | ctx = new DeleteContext(changeNumber, modifiedEntryUUID); |
| | | deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | | return new SynchronizationProviderResult(true); |
| | | } |
| | | |
| | | /** |
| | | * Implement the handleConflictResolution phase of the addOperation. |
| | | * |
| | | * @param addOperation The AddOperation. |
| | | * @return A SynchronizationProviderResult indicating if the operation |
| | | * can continue. |
| | | */ |
| | | public SynchronizationProviderResult handleConflictResolution( |
| | | AddOperation addOperation) |
| | | { |
| | | if (addOperation.isSynchronizationOperation()) |
| | | { |
| | | AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); |
| | | /* |
| | | * If an entry with the same entry uniqueID already exist then |
| | | * this operation has already been replayed in the past. |
| | | */ |
| | | String uuid = ctx.getEntryUid(); |
| | | if (findEntryDN(uuid) != null) |
| | | { |
| | | addOperation.setResultCode(ResultCode.SUCCESS); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | } |
| | | return new SynchronizationProviderResult(true); |
| | | } |
| | | |
| | | /** |
| | | * Implement the handleConflictResolution phase of the ModifyDNOperation. |
| | | * |
| | | * @param modifyDNOperation The ModifyDNOperation. |
| | | * @return A SynchronizationProviderResult indicating if the operation |
| | | * can continue. |
| | | */ |
| | | public SynchronizationProviderResult handleConflictResolution( |
| | | ModifyDNOperation modifyDNOperation) |
| | | { |
| | | ModifyDnContext ctx = |
| | | (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); |
| | | if (ctx != null) |
| | | { |
| | | /* |
| | | * This is a synchronization operation |
| | | * Check that the modified entry has the same entryuuid |
| | | * as was in the original message. |
| | | */ |
| | | String modifiedEntryUUID = |
| | | Historical.getEntryUuid(modifyDNOperation.getOriginalEntry()); |
| | | if (!modifiedEntryUUID.equals(ctx.getEntryUid())) |
| | | { |
| | | /* |
| | | * The modified entry is not the same entry as the one on |
| | | * the original change was performed. |
| | | * Probably the original entry was renamed and replaced with |
| | | * another entry. |
| | | * We must not let the change proceed, return a negative |
| | | * result and set the result code to NO_SUCH_OBJET. |
| | | * When the operation will return, the thread that started the |
| | | * operation will try to find the correct entry and restart a new |
| | | * operation. |
| | | */ |
| | | modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | if (modifyDNOperation.getNewSuperior() != null) |
| | | { |
| | | /* |
| | | * Also check that the current id of the |
| | | * parent is the same as when the operation was performed. |
| | | */ |
| | | String newParentId = findEntryId(modifyDNOperation.getNewSuperior()); |
| | | if (!newParentId.equals(ctx.getNewParentId())) |
| | | { |
| | | modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // There is no Synchronization context attached to the operation |
| | | // so this is not a synchronization operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation); |
| | | String newParentId = null; |
| | | if (modifyDNOperation.getNewSuperior() != null) |
| | | { |
| | | newParentId = findEntryId(modifyDNOperation.getNewSuperior()); |
| | | } |
| | | |
| | | Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); |
| | | ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId); |
| | | modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | | return new SynchronizationProviderResult(true); |
| | | } |
| | | |
| | | /** |
| | | * Handle the conflict resolution. |
| | | * Called by the core server after locking the entry and before |
| | | * starting the actual modification. |
| | |
| | | public SynchronizationProviderResult handleConflictResolution( |
| | | ModifyOperation modifyOperation) |
| | | { |
| | | // If operation do not yet have a change number, generate it |
| | | ChangeNumber changeNumber = |
| | | (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION); |
| | | if (changeNumber == null) |
| | | { |
| | | synchronized(pendingChanges) |
| | | { |
| | | changeNumber = changeNumberGenerator.NewChangeNumber(); |
| | | pendingChanges.put(changeNumber, new PendingChange(changeNumber, |
| | | modifyOperation, |
| | | null)); |
| | | } |
| | | modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber); |
| | | } |
| | | ModifyContext ctx = |
| | | (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); |
| | | |
| | | // if Operation is a synchronization operation, solve conflicts |
| | | if (modifyOperation.isSynchronizationOperation()) |
| | | Entry modifiedEntry = modifyOperation.getModifiedEntry(); |
| | | if (ctx == null) |
| | | { |
| | | Entry modifiedEntry = modifyOperation.getModifiedEntry(); |
| | | // There is no Synchronization context attached to the operation |
| | | // so this is not a synchronization operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(modifyOperation); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); |
| | | ctx = new ModifyContext(changeNumber, modifiedEntryUUID); |
| | | modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | | else |
| | | { |
| | | String modifiedEntryUUID = ctx.getEntryUid(); |
| | | String currentEntryUUID = Historical.getEntryUuid(modifiedEntry); |
| | | if (!currentEntryUUID.equals(modifiedEntryUUID)) |
| | | { |
| | | /* |
| | | * The current modified entry is not the same entry as the one on |
| | | * the original modification was performed. |
| | | * Probably the original entry was renamed and replaced with |
| | | * another entry. |
| | | * We must not let the modification proceed, return a negative |
| | | * result and set the result code to NO_SUCH_OBJET. |
| | | * When the operation will return, the thread that started the |
| | | * operation will try to find the correct entry and restart a new |
| | | * operation. |
| | | */ |
| | | modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | |
| | | /* |
| | | * Solve the conflicts between modify operations |
| | | */ |
| | | Historical historicalInformation = Historical.load(modifiedEntry); |
| | | modifyOperation.setAttachment(HISTORICAL, historicalInformation); |
| | | |
| | |
| | | * stop the processing and send an OK result |
| | | */ |
| | | modifyOperation.setResultCode(ResultCode.SUCCESS); |
| | | /* |
| | | * TODO : check that post operation do get called and |
| | | * that pendingChanges do get updated |
| | | */ |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | } |
| | | return new SynchronizationProviderResult(true); |
| | | } |
| | | |
| | | /** |
| | | * The preOperation phase for the add Operation. |
| | | * Its job is to generate the Synchronization context associated to the |
| | | * operation. It is necessary to do it in this phase because contrary to |
| | | * the other operations, the entry uid is not set when the handleConflict |
| | | * phase is called. |
| | | * |
| | | * @param addOperation The Add Operation. |
| | | */ |
| | | public void doPreOperation(AddOperation addOperation) |
| | | { |
| | | AddContext ctx = new AddContext(generateChangeNumber(addOperation), |
| | | Historical.getEntryUuid(addOperation), |
| | | findEntryId(addOperation.getEntryDN().getParent())); |
| | | |
| | | addOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | | |
| | | /** |
| | | * Receive an update message from the changelog. |
| | |
| | | |
| | | /** |
| | | * Do the necessary processing when an UpdateMessage was received. |
| | | * |
| | | * @param update The received UpdateMessage. |
| | | */ |
| | | public void receiveUpdate(UpdateMessage update) |
| | |
| | | { |
| | | numReplayedPostOpCalled++; |
| | | UpdateMessage msg = null; |
| | | ChangeNumber curChangeNumber = |
| | | (ChangeNumber) op.getAttachment(SYNCHRONIZATION); |
| | | ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op); |
| | | |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | if (curChangeNumber != null) |
| | | { |
| | | /* |
| | | * This code can be executed by multiple threads |
| | | * Since TreeMap is not synchronized, it is mandatory to synchronize |
| | | * it now. |
| | | */ |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | } |
| | | } |
| | | return; |
| | | } |
| | | ResultCode result = op.getResultCode(); |
| | | boolean isAssured = isAssured(op); |
| | | |
| | | if (!op.isSynchronizationOperation()) |
| | | if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation())) |
| | | { |
| | | switch (op.getOperationType()) |
| | | msg = UpdateMessage.generateMsg(op, isAssured); |
| | | |
| | | if (msg == null) |
| | | { |
| | | case MODIFY : |
| | | msg = new ModifyMsg((ModifyOperation) op); |
| | | break; |
| | | case ADD: |
| | | msg = new AddMsg((AddOperation) op); |
| | | break; |
| | | case DELETE : |
| | | msg = new DeleteMsg((DeleteOperation) op); |
| | | break; |
| | | case MODIFY_DN : |
| | | msg = new ModifyDNMsg((ModifyDNOperation) op); |
| | | break; |
| | | default : |
| | | /* |
| | | * This is an operation type that we do not know about |
| | | * It should never happen |
| | | * This code can be executed by multiple threads |
| | | * Since TreeMap is not synchronized, it is mandatory to synchronize |
| | | * it now. |
| | | * It should never happen. |
| | | */ |
| | | synchronized (pendingChanges) |
| | | { |
| | |
| | | return; |
| | | } |
| | | } |
| | | if (isAssured(op)) |
| | | { |
| | | msg.setAssured(); |
| | | } |
| | | } |
| | | |
| | | synchronized(pendingChanges) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(curChangeNumber); |
| | | if (curChange == null) |
| | | if (result == ResultCode.SUCCESS) |
| | | { |
| | | // This should never happen |
| | | int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; |
| | | String message = getMessage(msgID, curChangeNumber.toString(), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | curChange.setCommitted(true); |
| | | |
| | | if (op.isSynchronizationOperation()) |
| | | curChange.setOp(op); |
| | | else |
| | | curChange.setMsg(msg); |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | ChangeNumber lastCommittedChangeNumber = null; |
| | | |
| | | if (!op.isSynchronizationOperation() && msg.isAssured()) |
| | | { |
| | | waitingAckMsgs.put(curChangeNumber, msg); |
| | | } |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if (firstChange.getOp().isSynchronizationOperation() == false) |
| | | PendingChange curChange = pendingChanges.get(curChangeNumber); |
| | | if (curChange == null) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | | // This should never happen |
| | | int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; |
| | | String message = getMessage(msgID, curChangeNumber.toString(), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | curChange.setCommitted(true); |
| | | |
| | | lastCommittedChangeNumber = firstChange.getChangeNumber(); |
| | | |
| | | pendingChanges.remove(lastCommittedChangeNumber); |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | firstChange = null; |
| | | } |
| | | if (op.isSynchronizationOperation()) |
| | | curChange.setOp(op); |
| | | else |
| | | curChange.setMsg(msg); |
| | | |
| | | if (!op.isSynchronizationOperation() && isAssured && (msg != null)) |
| | | { |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | waitingAckMsgs.put(curChangeNumber, msg); |
| | | } |
| | | } |
| | | if (lastCommittedChangeNumber != null) |
| | | state.update(lastCommittedChangeNumber); |
| | | else if (!op.isSynchronizationOperation()) |
| | | pendingChanges.remove(curChangeNumber); |
| | | |
| | | pushCommittedChanges(); |
| | | } |
| | | |
| | | if (!op.isSynchronizationOperation() && msg.isAssured()) |
| | | if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null) |
| | | && (result == ResultCode.SUCCESS)) |
| | | { |
| | | synchronized (msg) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if an operation must be processed as an assured operation. |
| | | * |
| | | * @param op the operation to be checked. |
| | | * @return true if the operations must be processed as an assured operation. |
| | | */ |
| | | private boolean isAssured(Operation op) |
| | | { |
| | | // TODO : should have a filtering mechanism for checking |
| | | // operation that are assured and operations that are not. |
| | | return assuredFlag; |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates received by the synchronization plugin. |
| | | * |
| | | * @return the number of updates received |
| | |
| | | } |
| | | |
| | | /** |
| | | * Generate and set the ChangeNumber of a given Operation. |
| | | * |
| | | * @param operation The Operation for which the ChangeNumber must be set. |
| | | */ |
| | | public void setChangeNumber(Operation operation) |
| | | { |
| | | ChangeNumber changeNumber = |
| | | (ChangeNumber) operation.getAttachment(SYNCHRONIZATION); |
| | | if (changeNumber == null) |
| | | { |
| | | synchronized(pendingChanges) |
| | | { |
| | | changeNumber = changeNumberGenerator.NewChangeNumber(); |
| | | pendingChanges.put(changeNumber, new PendingChange(changeNumber, |
| | | operation, null)); |
| | | } |
| | | operation.setAttachment(SYNCHRONIZATION, changeNumber); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | |
| | | synchroThreads = new ArrayList<ListenerThread>(); |
| | | for (int i=0; i<10; i++) |
| | | { |
| | | ListenerThread myThread = new ListenerThread(this, changeNumberGenerator); |
| | | ListenerThread myThread = new ListenerThread(this); |
| | | myThread.start(); |
| | | synchroThreads.add(myThread); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Create and replay a synchronized Operation from an UpdateMessage. |
| | | * |
| | | * @param msg The UpdateMessage to be replayed. |
| | | */ |
| | | public void replay(UpdateMessage msg) |
| | | { |
| | | Operation op = null; |
| | | boolean done = false; |
| | | ChangeNumber changeNumber = null; |
| | | |
| | | try |
| | | { |
| | | while (!done) |
| | | { |
| | | op = msg.createOperation(conn); |
| | | |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | changeNumber = OperationContext.getChangeNumber(op); |
| | | if (changeNumber != null) |
| | | changeNumberGenerator.adjust(changeNumber); |
| | | |
| | | op.run(); |
| | | |
| | | ResultCode result = op.getResultCode(); |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | | if (op instanceof ModifyOperation) |
| | | { |
| | | ModifyOperation newOp = (ModifyOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | else if (op instanceof DeleteOperation) |
| | | { |
| | | DeleteOperation newOp = (DeleteOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | else if (op instanceof AddOperation) |
| | | { |
| | | AddOperation newOp = (AddOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | |
| | | } else if (op instanceof ModifyDNOperation) |
| | | { |
| | | ModifyDNOperation newOp = (ModifyDNOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | else |
| | | { |
| | | done = true; // unknown type of operation ?! |
| | | } |
| | | } |
| | | else |
| | | { |
| | | done = true; |
| | | } |
| | | } |
| | | } |
| | | catch (ASN1Exception e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID, msg) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | catch (LDAPException e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID, msg) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | catch (DataFormatException e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID, msg) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (changeNumber != null) |
| | | { |
| | | /* |
| | | * An Exception happened during the replay process. |
| | | * Continue with the next change but the servers will know start |
| | | * to be inconsistent. |
| | | * TODO : REPAIR : Should let the repair tool know about this |
| | | */ |
| | | int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION; |
| | | String message = getMessage(msgID, stackTraceToSingleLineString(e), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, message, msgID); |
| | | updateError(changeNumber); |
| | | } |
| | | else |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID, stackTraceToSingleLineString(e), |
| | | msg.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, message, msgID); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This methods is called when an error happends while replaying |
| | | * and operation. |
| | | * It is necessary because the postOPeration does not always get |
| | |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(changeNumber); |
| | | pushCommittedChanges(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Generate a new change number and insert it in the pending list. |
| | | * |
| | | * @param operation The operation for which the change number must be |
| | | * generated. |
| | | * @return The new change number. |
| | | */ |
| | | private ChangeNumber generateChangeNumber(Operation operation) |
| | | { |
| | | ChangeNumber changeNumber; |
| | | synchronized(pendingChanges) |
| | | { |
| | | changeNumber = changeNumberGenerator.NewChangeNumber(); |
| | | pendingChanges.put(changeNumber, |
| | | new PendingChange(changeNumber, operation, null)); |
| | | } |
| | | return changeNumber; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Find the Unique Id of the entry with the provided DN by doing a |
| | | * search of the entry and extracting its uniqueID from its attributes. |
| | | * |
| | | * @param dn The dn of the entry for which the unique Id is searched. |
| | | * |
| | | * @return The unique Id of the entry whith the provided DN. |
| | | */ |
| | | private String findEntryId(DN dn) |
| | | { |
| | | if (dn == null) |
| | | return null; |
| | | try |
| | | { |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(ENTRYUIDNAME); |
| | | InternalSearchOperation search = conn.processSearch(dn, |
| | | SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, |
| | | SearchFilter.createFilterFromString("objectclass=*"), |
| | | attrs); |
| | | |
| | | if (search.getResultCode() == ResultCode.SUCCESS) |
| | | { |
| | | LinkedList<SearchResultEntry> result = search.getSearchEntries(); |
| | | if (!result.isEmpty()) |
| | | { |
| | | SearchResultEntry resultEntry = result.getFirst(); |
| | | if (resultEntry != null) |
| | | { |
| | | return Historical.getEntryUuid(resultEntry); |
| | | } |
| | | } |
| | | } |
| | | } catch (DirectoryException e) |
| | | { |
| | | // never happens because the filter is always valid. |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * find the current dn of an entry from its entry uuid. |
| | | * |
| | | * @param uuid the Entry Unique ID. |
| | | * @return The curernt dn of the entry or null if there is no entry with |
| | | * the specified uuid. |
| | | */ |
| | | private DN findEntryDN(String uuid) |
| | | { |
| | | try |
| | | { |
| | | InternalSearchOperation search = conn.processSearch(baseDN, |
| | | SearchScope.WHOLE_SUBTREE, |
| | | SearchFilter.createFilterFromString("entryuuid="+uuid)); |
| | | if (search.getResultCode() == ResultCode.SUCCESS) |
| | | { |
| | | LinkedList<SearchResultEntry> result = search.getSearchEntries(); |
| | | if (!result.isEmpty()) |
| | | { |
| | | SearchResultEntry resultEntry = result.getFirst(); |
| | | if (resultEntry != null) |
| | | { |
| | | return resultEntry.getDN(); |
| | | } |
| | | } |
| | | } |
| | | } catch (DirectoryException e) |
| | | { |
| | | // never happens because the filter is always valid. |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Solve a conflict detected when replaying a modify operation. |
| | | * |
| | | * @param op The operation that triggered the conflict detection. |
| | | * @param msg The operation that triggered the conflict detection. |
| | | * @return true if the process is completed, false if it must continue.. |
| | | */ |
| | | private boolean solveNamingConflict(ModifyOperation op, |
| | | UpdateMessage msg) |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); |
| | | String entryUid = ctx.getEntryUid(); |
| | | |
| | | if (result == ResultCode.NO_SUCH_OBJECT) |
| | | { |
| | | /* |
| | | * This error may happen the operation is a modification but |
| | | * the entry had been renamed on a different master in the same time. |
| | | * search if the entry has been renamed, and return the new dn |
| | | * of the entry. |
| | | */ |
| | | msg.setDn(findEntryDN(entryUid).toString()); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** Solve a conflict detected when replaying a delete operation. |
| | | * |
| | | * @param op The operation that triggered the conflict detection. |
| | | * @param msg The operation that triggered the conflict detection. |
| | | * @return true if the process is completed, false if it must continue.. |
| | | */ |
| | | private boolean solveNamingConflict(DeleteOperation op, |
| | | UpdateMessage msg) |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); |
| | | String entryUid = ctx.getEntryUid(); |
| | | |
| | | if (result == ResultCode.NO_SUCH_OBJECT) |
| | | { |
| | | /* |
| | | * Find if the entry is still in the database. |
| | | */ |
| | | DN currentDn = findEntryDN(entryUid); |
| | | if (currentDn == null) |
| | | { |
| | | /* |
| | | * The entry has already been deleted, either because this delete |
| | | * has already been replayed or because another concurrent delete |
| | | * has already done the job. |
| | | * In any case, there is is nothing more to do. |
| | | */ |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * This entry has been renamed, replay the delete using its new DN. |
| | | */ |
| | | msg.setDn(currentDn.toString()); |
| | | return false; |
| | | } |
| | | } |
| | | else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) |
| | | { |
| | | /* |
| | | * This may happen when we replay a DELETE done on a master |
| | | * but children of this entry have been added on another master. |
| | | */ |
| | | |
| | | /* |
| | | * TODO : either delete all the childs or rename the child below |
| | | * the top suffix by adding entryuuid in dn and delete this entry. |
| | | */ |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Solve a conflict detected when replaying a ADD operation. |
| | | * |
| | | * @param op The operation that triggered the conflict detection. |
| | | * @param msg The operation that triggered the conflict detection. |
| | | * @return true if the process is completed, false if it must continue. |
| | | * @throws Exception When the operation is not valid. |
| | | */ |
| | | private boolean solveNamingConflict(AddOperation op, |
| | | UpdateMessage msg) throws Exception |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); |
| | | String entryUid = ctx.getEntryUid(); |
| | | String parentUniqueId = ctx.getParentUid(); |
| | | |
| | | if (result == ResultCode.NO_SUCH_OBJECT) |
| | | { |
| | | /* |
| | | * This can happen if the parent has been renamed or deleted |
| | | * find the parent dn and calculate a new dn for the entry |
| | | */ |
| | | if (parentUniqueId == null) |
| | | { |
| | | /* |
| | | * This entry is the base dn of the backend. |
| | | * It is quite weird that the operation result be NO_SUCH_OBJECT. |
| | | * There is notthing more we can do except TODO log a |
| | | * message for the repair tool to look at this problem. |
| | | */ |
| | | return true; |
| | | } |
| | | DN parentDn = findEntryDN(parentUniqueId); |
| | | if (parentDn == null) |
| | | { |
| | | /* |
| | | * The parent has been deleted, so this entry should not |
| | | * exist don't do the ADD. |
| | | */ |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | RDN entryRdn = op.getEntryDN().getRDN(); |
| | | msg.setDn(parentDn + "," + entryRdn); |
| | | return false; |
| | | } |
| | | } |
| | | else if (result == ResultCode.ENTRY_ALREADY_EXISTS) |
| | | { |
| | | /* |
| | | * This can happen if |
| | | * - two adds are done on different servers but with the |
| | | * same target DN. |
| | | * - the same ADD is being replayed for the second time on this server. |
| | | * if the nsunique ID already exist, assume this is a replay and |
| | | * don't do anything |
| | | * if the entry unique id do not exist, generate conflict. |
| | | */ |
| | | if (findEntryDN(entryUid) != null) |
| | | { |
| | | // entry already exist : this is a replay |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | addConflict(op); |
| | | msg.setDn(generateConflictDn(entryUid, msg.getDn())); |
| | | return false; |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Solve a conflict detected when replaying a Modify DN operation. |
| | | * |
| | | * @param op The operation that triggered the conflict detection. |
| | | * @param msg The operation that triggered the conflict detection. |
| | | * @return true if the process is completed, false if it must continue. |
| | | * @throws Exception When the operation is not valid. |
| | | */ |
| | | private boolean solveNamingConflict(ModifyDNOperation op, |
| | | UpdateMessage msg) throws Exception |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); |
| | | String entryUid = ctx.getEntryUid(); |
| | | String newSuperiorID = ctx.getNewParentId(); |
| | | |
| | | if (result == ResultCode.NO_SUCH_OBJECT) |
| | | { |
| | | ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; |
| | | |
| | | /* |
| | | * four possible cases : |
| | | * - the modified entry has been renamed |
| | | * - the new parent has been renamed |
| | | * - the operation is replayed for the second time. |
| | | * - the entry has been deleted |
| | | * action : |
| | | * - change the target dn and the new parent dn and |
| | | * restart the operation, |
| | | * - don't do anything if the operation is replayed. |
| | | */ |
| | | |
| | | // Construct the new DN to use for the entry. |
| | | DN entryDN = op.getEntryDN(); |
| | | DN newSuperior = findEntryDN(newSuperiorID); |
| | | RDN newRDN = op.getNewRDN(); |
| | | DN parentDN; |
| | | |
| | | if (newSuperior == null) |
| | | { |
| | | parentDN = entryDN.getParent(); |
| | | } |
| | | else |
| | | { |
| | | parentDN = newSuperior; |
| | | } |
| | | |
| | | if ((parentDN == null) || parentDN.isNullDN()) |
| | | { |
| | | /* this should never happen |
| | | * can't solve any conflict in this case. |
| | | */ |
| | | throw new Exception("operation parameters are invalid"); |
| | | } |
| | | |
| | | RDN[] parentComponents = parentDN.getRDNComponents(); |
| | | RDN[] newComponents = new RDN[parentComponents.length+1]; |
| | | System.arraycopy(parentComponents, 0, newComponents, 1, |
| | | parentComponents.length); |
| | | newComponents[0] = newRDN; |
| | | |
| | | DN newDN = new DN(newComponents); |
| | | |
| | | // get the current DN of this entry in the database. |
| | | DN currentDN = findEntryDN(entryUid); |
| | | |
| | | // if the newDN and the current DN match then the operation |
| | | // is a no-op (this was probably a second replay) |
| | | // don't do anything. |
| | | if (newDN.equals(currentDN)) |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | msg.setDn(currentDN.toString()); |
| | | modifyDnMsg.setNewSuperior(newSuperior.toString()); |
| | | return false; |
| | | } |
| | | else if (result == ResultCode.ENTRY_ALREADY_EXISTS) |
| | | { |
| | | /* |
| | | * This may happen when two modifyDn operation |
| | | * are done on different servers but with the same target DN |
| | | * add the conflict object class to the entry |
| | | * and rename it using its entryuuid. |
| | | */ |
| | | generateAddConflictOp(op); |
| | | msg.setDn(generateConflictDn(entryUid, msg.getDn())); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Generate a modification to add the conflict ObjectClass to an entry |
| | | * whose Dn is now conflicting with another entry. |
| | | * |
| | | * @param op The operation causing the conflict. |
| | | */ |
| | | private void generateAddConflictOp(ModifyDNOperation op) |
| | | { |
| | | // TODO |
| | | } |
| | | |
| | | /** |
| | | * Add the conflict object class to an entry that could |
| | | * not be added because it is conflicting with another entry. |
| | | * |
| | | * @param addOp The conflicting Add Operation. |
| | | */ |
| | | private void addConflict(AddOperation addOp) |
| | | { |
| | | /* |
| | | * TODO |
| | | */ |
| | | } |
| | | |
| | | /** |
| | | * Generate the Dn to use for a conflicting entry. |
| | | * |
| | | * @param op Operation that generated the conflict |
| | | * @param dn Original dn. |
| | | * @return The generated Dn for a conflicting entry. |
| | | */ |
| | | private String generateConflictDn(String entryUid, String dn) |
| | | { |
| | | return dn + "entryuuid=" + entryUid; |
| | | } |
| | | |
| | | /** |
| | | * Check if an operation must be processed as an assured operation. |
| | | * |
| | | * @param op the operation to be checked. |
| | | * @return true if the operations must be processed as an assured operation. |
| | | */ |
| | | private boolean isAssured(Operation op) |
| | | { |
| | | // TODO : should have a filtering mechanism for checking |
| | | // operation that are assured and operations that are not. |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the changelog service. |
| | | * PRECONDITION : The pendingChanges lock must be held before calling |
| | | * this method. |
| | | */ |
| | | private void pushCommittedChanges() |
| | | { |
| | | if (pendingChanges.isEmpty()) |
| | | return; |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if (firstChange.getOp().isSynchronizationOperation() == false) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | | } |
| | | state.update(firstChangeNumber); |
| | | pendingChanges.remove(firstChangeNumber); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | firstChange = null; |
| | | } |
| | | else |
| | | { |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |