| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | |
| | | { |
| | | private ReplicationMonitor monitor; |
| | | |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | private ReplicationBroker broker; |
| | | |
| | | private List<ListenerThread> synchroThreads = |
| | | new ArrayList<ListenerThread>(); |
| | | private final SortedMap<ChangeNumber, PendingChange> pendingChanges = |
| | | new TreeMap<ChangeNumber, PendingChange>(); |
| | | private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs = |
| | | new TreeMap<ChangeNumber, UpdateMessage>(); |
| | | private int numRcvdUpdates = 0; |
| | | private int numSentUpdates = 0; |
| | | private AtomicInteger numRcvdUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numSentUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(); |
| | | private int debugCount = 0; |
| | | private PersistentServerState state; |
| | |
| | | private int maxSendDelay = 0; |
| | | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | | * done on the local database. |
| | | * It contain both the update that are done directly on this server |
| | | * and the updates that was done on another server, transmitted |
| | | * by the replication server and that are currently replayed. |
| | | * It is usefull to make sure that dependencies between operations |
| | | * are correctly fullfilled, that the local operations are sent in a |
| | | * correct order to the replication server and that the ServerState |
| | | * is not updated too early. |
| | | */ |
| | | private PendingChanges pendingChanges; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | |
| | | private ConfigEntry backendConfigEntry; |
| | | private List<DN> branches = new ArrayList<DN>(0); |
| | | |
| | | private int listenerThreadNumber = 1; |
| | | private int listenerThreadNumber = 10; |
| | | private boolean receiveStatus = true; |
| | | |
| | | private Collection<String> replicationServers; |
| | |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, state); |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | try |
| | |
| | | */ |
| | | } |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | pendingChanges = |
| | | new PendingChanges(new ChangeNumberGenerator(serverId, state), |
| | | broker, state); |
| | | |
| | | // listen for changes on the configuration |
| | | configuration.addChangeListener(this); |
| | | } |
| | |
| | | * of the parent entry |
| | | */ |
| | | |
| | | // There is a potential of perfs improvement here |
| | | // if we could avoid the following parent entry retrieval |
| | | DN parentDnFromCtx = findEntryDN(ctx.getParentUid()); |
| | | |
| | | if (parentDnFromCtx != null) |
| | | String parentUid = ctx.getParentUid(); |
| | | // root entry have no parent, |
| | | // there is no need to check for it. |
| | | if (parentUid != null) |
| | | { |
| | | DN entryDN = addOperation.getEntryDN(); |
| | | DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); |
| | | if ((parentDnFromEntryDn != null) |
| | | && (!parentDnFromCtx.equals(parentDnFromEntryDn))) |
| | | // There is a potential of perfs improvement here |
| | | // if we could avoid the following parent entry retrieval |
| | | DN parentDnFromCtx = findEntryDN(ctx.getParentUid()); |
| | | |
| | | if (parentDnFromCtx == null) |
| | | { |
| | | // parentEntry has been renamed |
| | | // replication name conflict resolution is expected to fix that |
| | | // later in the flow |
| | | // The parent does not exist with the specified unique id |
| | | // stop the operation with NO_SUCH_OBJECT and let the |
| | | // conflict resolution or the dependency resolution solve this. |
| | | addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | else |
| | | { |
| | | DN entryDN = addOperation.getEntryDN(); |
| | | DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); |
| | | if ((parentDnFromEntryDn != null) |
| | | && (!parentDnFromCtx.equals(parentDnFromEntryDn))) |
| | | { |
| | | // parentEntry has been renamed |
| | | // replication name conflict resolution is expected to fix that |
| | | // later in the flow |
| | | addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return new SynchronizationProviderResult(true); |
| | |
| | | */ |
| | | public UpdateMessage receive() |
| | | { |
| | | synchronized (broker) |
| | | UpdateMessage update = pendingChanges.getNextUpdate(); |
| | | |
| | | if (update == null) |
| | | { |
| | | UpdateMessage update = null; |
| | | while (update == null) |
| | | synchronized (broker) |
| | | { |
| | | ReplicationMessage msg; |
| | | try |
| | | while (update == null) |
| | | { |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | ReplicationMessage msg; |
| | | try |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | log("Broker received message :" + msg); |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | receiveAck(ack); |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; |
| | | try |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | { |
| | | initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | null); |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | catch(DirectoryException de) |
| | | log("Broker received message :" + msg); |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | AckMessage ack = (AckMessage) msg; |
| | | receiveAck(ack); |
| | | } |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; |
| | | try |
| | | { |
| | | initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | null); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; |
| | | |
| | | try |
| | | { |
| | | importBackend(initMsg); |
| | | try |
| | | { |
| | | importBackend(initMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Return an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | log(getMessage(msgID, |
| | | backend.getBackendID()) + de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | catch(DirectoryException de) |
| | | else if (msg instanceof ErrorMessage) |
| | | { |
| | | // Return an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | log(getMessage(msgID, backend.getBackendID()) + de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | // just retry |
| | | } |
| | | } catch (SocketTimeoutException e) |
| | | { |
| | | // just retry |
| | | } |
| | | |
| | | } |
| | | return update; |
| | | } |
| | | return update; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void receiveUpdate(UpdateMessage update) |
| | | { |
| | | ChangeNumber changeNumber = update.getChangeNumber(); |
| | | |
| | | synchronized (pendingChanges) |
| | | { |
| | | if (pendingChanges.containsKey(changeNumber)) |
| | | { |
| | | /* |
| | | * This should never happen, |
| | | * TODO log error and throw exception |
| | | */ |
| | | } |
| | | pendingChanges.put(changeNumber, |
| | | new PendingChange(changeNumber, null, update)); |
| | | numRcvdUpdates++; |
| | | } |
| | | pendingChanges.putRemoteUpdate(update); |
| | | numRcvdUpdates.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | |
| | | UpdateMessage update; |
| | | ChangeNumber changeNumber = ack.getChangeNumber(); |
| | | |
| | | synchronized (pendingChanges) |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.get(changeNumber); |
| | | waitingAckMsgs.remove(changeNumber); |
| | | update = waitingAckMsgs.remove(changeNumber); |
| | | } |
| | | if (update != null) |
| | | { |
| | |
| | | * This is an operation type that we do not know about |
| | | * It should never happen. |
| | | */ |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | int msgID = MSGID_UNKNOWN_TYPE; |
| | | String message = getMessage(msgID, op.getOperationType().toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | pendingChanges.remove(curChangeNumber); |
| | | int msgID = MSGID_UNKNOWN_TYPE; |
| | | String message = getMessage(msgID, op.getOperationType().toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | synchronized(pendingChanges) |
| | | if (result == ResultCode.SUCCESS) |
| | | { |
| | | if (result == ResultCode.SUCCESS) |
| | | try |
| | | { |
| | | PendingChange curChange = pendingChanges.get(curChangeNumber); |
| | | if (curChange == null) |
| | | { |
| | | // This should never happen |
| | | int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; |
| | | String message = getMessage(msgID, curChangeNumber.toString(), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | curChange.setCommitted(true); |
| | | pendingChanges.commit(curChangeNumber, op, msg); |
| | | } |
| | | catch (NoSuchElementException e) |
| | | { |
| | | int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; |
| | | String message = getMessage(msgID, curChangeNumber.toString(), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | |
| | | if (op.isSynchronizationOperation()) |
| | | curChange.setOp(op); |
| | | else |
| | | curChange.setMsg(msg); |
| | | |
| | | if (msg != null && isAssured) |
| | | if (msg != null && isAssured) |
| | | { |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | // Add the assured message to the list of those whose acknowledgements |
| | | // we are awaiting. |
| | | // Add the assured message to the list of update that are |
| | | // waiting acknowledgements |
| | | waitingAckMsgs.put(curChangeNumber, msg); |
| | | } |
| | | } |
| | | else if (!op.isSynchronizationOperation()) |
| | | { |
| | | // Remove an unsuccessful non-replication operation from the pending |
| | | // changes list. |
| | | if (curChangeNumber != null) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | } |
| | | } |
| | | |
| | | pushCommittedChanges(); |
| | | } |
| | | else if (!op.isSynchronizationOperation()) |
| | | { |
| | | // Remove an unsuccessful non-replication operation from the pending |
| | | // changes list. |
| | | if (curChangeNumber != null) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | } |
| | | } |
| | | |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | |
| | | // Wait for acknowledgement of an assured message. |
| | | if (msg != null && isAssured) |
| | |
| | | */ |
| | | public int getNumRcvdUpdates() |
| | | { |
| | | return numRcvdUpdates; |
| | | return numRcvdUpdates.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getNumSentUpdates() |
| | | { |
| | | return numSentUpdates; |
| | | return numSentUpdates.get(); |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates in the pending list. |
| | | * Get the number of updates in the pending list. |
| | | * |
| | | * @return The number of updates in the pending list |
| | | */ |
| | |
| | | { |
| | | Operation op = null; |
| | | boolean done = false; |
| | | boolean dependency = false; |
| | | ChangeNumber changeNumber = null; |
| | | int retryCount = 10; |
| | | boolean firstTry = true; |
| | | |
| | | try |
| | | { |
| | | while (!done && retryCount-- > 0) |
| | | while ((!dependency) && (!done) && (retryCount-- > 0)) |
| | | { |
| | | op = msg.createOperation(conn); |
| | | |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | changeNumber = OperationContext.getChangeNumber(op); |
| | | if (changeNumber != null) |
| | | changeNumberGenerator.adjust(changeNumber); |
| | | |
| | | op.run(); |
| | | |
| | | ResultCode result = op.getResultCode(); |
| | | |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | | if (op instanceof ModifyOperation) |
| | | { |
| | | ModifyOperation newOp = (ModifyOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | if (!dependency) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else if (op instanceof DeleteOperation) |
| | | { |
| | | DeleteOperation newOp = (DeleteOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else if (op instanceof AddOperation) |
| | | { |
| | | AddOperation newOp = (AddOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | |
| | | } else if (op instanceof ModifyDNOperation) |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | if (!dependency) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else if (op instanceof ModifyDNOperation) |
| | | { |
| | | ModifyDNOperation newOp = (ModifyDNOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | ModifyDNMsg newMsg = (ModifyDNMsg) msg; |
| | | dependency = pendingChanges.checkDependencies(newMsg); |
| | | if (!dependency) |
| | | { |
| | | ModifyDNOperation newOp = (ModifyDNOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | done = true; |
| | | } |
| | | firstTry = false; |
| | | } |
| | | |
| | | if (!done) |
| | | if (!done && !dependency) |
| | | { |
| | | // Continue with the next change but the servers could now become |
| | | // inconsistent. |
| | |
| | | String message = getMessage(msgID, op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, message, msgID); |
| | | |
| | | updateError(changeNumber); |
| | | } |
| | | } |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |
| | | if (!dependency) |
| | | { |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void updateError(ChangeNumber changeNumber) |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | change.setCommitted(true); |
| | | pushCommittedChanges(); |
| | | } |
| | | pendingChanges.commit(changeNumber); |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | private ChangeNumber generateChangeNumber(Operation operation) |
| | | { |
| | | ChangeNumber changeNumber; |
| | | |
| | | changeNumber = changeNumberGenerator.NewChangeNumber(); |
| | | PendingChange change = new PendingChange(changeNumber, operation, null); |
| | | synchronized(pendingChanges) |
| | | { |
| | | pendingChanges.put(changeNumber, change); |
| | | } |
| | | return changeNumber; |
| | | return pendingChanges.putLocalOperation(operation); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | * PRECONDITION : The pendingChanges lock must be held before calling |
| | | * this method. |
| | | */ |
| | | private void pushCommittedChanges() |
| | | { |
| | | if (pendingChanges.isEmpty()) |
| | | return; |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if ((firstChange.getOp() != null ) && |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | | } |
| | | state.update(firstChangeNumber); |
| | | pendingChanges.remove(firstChangeNumber); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | firstChange = null; |
| | | } |
| | | else |
| | | { |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |