| | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | | * done on the local database. |
| | | * It contain both the update that are done directly on this server |
| | | * and the updates that was done on another server, transmitted |
| | | * by the replication server and that are currently replayed. |
| | | * It is usefull to make sure that dependencies between operations |
| | | * are correctly fullfilled, that the local operations are sent in a |
| | | * Is is usefull to make sure that the local operations are sent in a |
| | | * correct order to the replication server and that the ServerState |
| | | * is not updated too early. |
| | | */ |
| | | private PendingChanges pendingChanges; |
| | | |
| | | /** |
| | | * It contain the updates that were done on other servers, transmitted |
| | | * by the replication server and that are currently replayed. |
| | | * It is usefull to make sure that dependencies between operations |
| | | * are correctly fullfilled and to to make sure that the ServerState is |
| | | * not updated too early. |
| | | */ |
| | | private RemotePendingChanges remotePendingChanges; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | ChangeNumberGenerator generator = |
| | | new ChangeNumberGenerator(serverId, state); |
| | | |
| | | pendingChanges = |
| | | new PendingChanges(new ChangeNumberGenerator(serverId, state), |
| | | broker, state); |
| | | |
| | | remotePendingChanges = new RemotePendingChanges(generator, state); |
| | | |
| | | // listen for changes on the configuration |
| | | configuration.addChangeListener(this); |
| | | } |
| | |
| | | */ |
| | | public UpdateMessage receive() |
| | | { |
| | | UpdateMessage update = pendingChanges.getNextUpdate(); |
| | | UpdateMessage update = remotePendingChanges.getNextUpdate(); |
| | | |
| | | if (update == null) |
| | | { |
| | |
| | | */ |
| | | public void receiveUpdate(UpdateMessage update) |
| | | { |
| | | pendingChanges.putRemoteUpdate(update); |
| | | remotePendingChanges.putRemoteUpdate(update); |
| | | numRcvdUpdates.incrementAndGet(); |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | pendingChanges.commit(curChangeNumber, op, msg); |
| | | if (op.isSynchronizationOperation()) |
| | | { |
| | | remotePendingChanges.commit(curChangeNumber); |
| | | } |
| | | else |
| | | { |
| | | pendingChanges.commit(curChangeNumber, msg); |
| | | } |
| | | } |
| | | catch (NoSuchElementException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | if (!op.isSynchronizationOperation()) |
| | | { |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | } |
| | | |
| | | // Wait for acknowledgement of an assured message. |
| | | if (msg != null && isAssured) |
| | |
| | | if (op instanceof ModifyOperation) |
| | | { |
| | | ModifyOperation newOp = (ModifyOperation) op; |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | dependency = remotePendingChanges.checkDependencies(newOp); |
| | | if (!dependency) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | |
| | | else if (op instanceof DeleteOperation) |
| | | { |
| | | DeleteOperation newOp = (DeleteOperation) op; |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | dependency = remotePendingChanges.checkDependencies(newOp); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | |
| | | { |
| | | AddOperation newOp = (AddOperation) op; |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | dependency = remotePendingChanges.checkDependencies(newOp); |
| | | if (!dependency) |
| | | { |
| | | done = solveNamingConflict(newOp, addMsg); |
| | |
| | | else if (op instanceof ModifyDNOperation) |
| | | { |
| | | ModifyDNMsg newMsg = (ModifyDNMsg) msg; |
| | | dependency = pendingChanges.checkDependencies(newMsg); |
| | | dependency = remotePendingChanges.checkDependencies(newMsg); |
| | | if (!dependency) |
| | | { |
| | | ModifyDNOperation newOp = (ModifyDNOperation) op; |
| | |
| | | */ |
| | | public void updateError(ChangeNumber changeNumber) |
| | | { |
| | | pendingChanges.commit(changeNumber); |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | remotePendingChanges.commit(changeNumber); |
| | | } |
| | | |
| | | /** |