| opends/src/server/org/opends/server/replication/common/ServerState.java | ●●●●● patch | view | raw | blame | history | |
| opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | ●●●●● patch | view | raw | blame | history | |
| opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | ●●●●● patch | view | raw | blame | history | |
| opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | ●●●●● patch | view | raw | blame | history |
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -151,6 +151,7 @@ { if (changeNumber == null) return false; synchronized(this) { Short id = changeNumber.getServerId(); @@ -181,9 +182,9 @@ public Set<String> toStringSet() { HashSet<String> set = new HashSet<String>(); synchronized (this) { for (Short key : list.keySet()) { ChangeNumber change = list.get(key); @@ -204,6 +205,7 @@ public ArrayList<ASN1OctetString> toASN1ArrayList() { ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); synchronized (this) { for (Short id : list.keySet()) @@ -215,7 +217,7 @@ return values; } /** * return the text representation of ServerState. * Return the text representation of ServerState. * @return the text representation of ServerState */ @Override opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -28,28 +28,23 @@ import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.OperationContext; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.types.DN; import org.opends.server.types.Operation; /** * * This class is use to store the list of operations currently * This class is use to store the list of local operations currently * in progress and not yet committed in the database. * * It is used to make sure that operations are sent to the Replication * Server in the order defined by their ChangeNumber. * It is also used to update the ServerState at the appropriate time. * * On object of this class is instanciated for each ReplicationDomain. */ public class PendingChanges { @@ -60,14 +55,6 @@ new TreeMap<ChangeNumber, PendingChange>(); /** * A sorted set containing the list of PendingChanges that have * not been replayed correctly because they are dependent on * another change to be completed. */ private SortedSet<PendingChange> dependentChanges = new TreeSet<PendingChange>(); /** * The ChangeNumberGenerator to use to create new unique ChangeNumbers * for each operation done on the replication domain. */ @@ -129,13 +116,10 @@ * * @param changeNumber The ChangeNumber of the update message that must be * set as committed. * @param op The Operation associated of the update when the * update is a local update. * @param msg The message associated to the update when the update * was received from a replication server. * @param msg The message associated to the update. */ public synchronized void commit(ChangeNumber changeNumber, Operation op, UpdateMessage msg) UpdateMessage msg) { PendingChange curChange = pendingChanges.get(changeNumber); if (curChange == null) @@ -144,10 +128,7 @@ } curChange.setCommitted(true); if (op.isSynchronizationOperation()) curChange.setOp(op); else curChange.setMsg(msg); curChange.setMsg(msg); } /** @@ -186,21 +167,6 @@ } /** * Add a new UpdateMessage that was received from the replication server * to the pendingList. * * @param update The UpdateMessage that was received from the replication * server and that will be added to the pending list. */ public synchronized void putRemoteUpdate(UpdateMessage update) { ChangeNumber changeNumber = update.getChangeNumber(); changeNumberGenerator.adjust(changeNumber); pendingChanges.put(changeNumber, new PendingChange(changeNumber, null, update)); } /** * Push all committed local changes to the replicationServer service. * * @return The number of pushed updates. @@ -237,323 +203,4 @@ } return numSentUpdates; } /** * Get the first update in the list that have some dependencies cleared. * * @return The UpdateMessage to be handled. */ public synchronized UpdateMessage getNextUpdate() { /* * Parse the list of Update with dependencies and check if the dependencies * are now cleared until an Update withour dependencies is found. */ for (PendingChange change : dependentChanges) { if (change.dependenciesIsCovered(state)) { dependentChanges.remove(change); return change.getMsg(); } } return null; } /** * Check if the given AddOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * AddOperation depends on * * - DeleteOperation done on the same DN * - ModifyDnOperation with the same target DN as the ADD DN * - ModifyDnOperation with new DN equals to the ADD DN parent * - AddOperation done on the parent DN of the ADD DN * * @param op The AddOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(AddOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { /* * Check is the operation to be run is a deleteOperation on the * same DN. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * parent of the current AddOperation. */ if (pendingChange.getTargetDN().isAncestorOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { /* * Check if the operation to be run is ModifyDnOperation with * the same target DN as the ADD DN * or a ModifyDnOperation with new DN equals to the ADD DN parent */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } else { ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg(); if (pendingModDn.newDNIsParent(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } } return hasDependencies; } /** * Check if the given ModifyOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * ModifyOperation depends on * - AddOperation done on the same DN * * @param op The ModifyOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(ModifyOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * same DN. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } /** * Mark the first pendingChange as dependent on the second PendingChange. * @param dependentChange The PendingChange that depend on the second * PendingChange. * @param pendingChange The PendingChange on which the first PendingChange * is dependent. */ private void addDependency( PendingChange dependentChange, PendingChange pendingChange) { dependentChange.addDependency(pendingChange.getChangeNumber()); dependentChanges.add(dependentChange); } /** * Check if the given ModifyDNMsg has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * Modify DN Operation depends on * - AddOperation done on the same DN as the target DN of the MODDN operation * - AddOperation done on the new parent of the MODDN operation * - DeleteOperation done on the new DN of the MODDN operation * - ModifyDNOperation done from the new DN of the MODDN operation * * @param msg The ModifyDNMsg to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(ModifyDNMsg msg) { boolean hasDependencies = false; ChangeNumber changeNumber = msg.getChangeNumber(); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; DN targetDn = change.getTargetDN(); for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { // Check if the target of the Delete is the same // as the new DN of this ModifyDN if (msg.newDNIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { // Check if the Add Operation was done on the new parent of // the MODDN operation if (msg.newParentIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } // Check if the AddOperation was done on the same DN as the // target DN of the MODDN operation if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { // Check if the ModifyDNOperation was done from the new DN of // the MODDN operation if (msg.newDNIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } /** * Check if the given DeleteOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * DeleteOperation depends on * - DeleteOperation done on children DN * - ModifyDnOperation with target DN that are children of the DEL DN * - AddOperation done on the same DN * * * @param op The DeleteOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(DeleteOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { /* * Check if the operation to be run is a deleteOperation on a * children of the current DeleteOperation. */ if (pendingChange.getTargetDN().isDescendantOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * parent of the current DeleteOperation. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { /* * Check if the operation to be run is an ModifyDNOperation * on a children of the current DeleteOperation */ if (pendingChange.getTargetDN().isDescendantOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } } opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
New file @@ -0,0 +1,469 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.OperationContext; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.types.DN; /** * * This class is used to store the list of remote changes received * from a replication server and taht are either currently being replayed * or that are waiting for being replayed. * * It is used to know when the ServerState must be updated and to conpute * the dependencies between operations. * * One of this object is instanciated for each ReplicationDomain. * */ public class RemotePendingChanges { /** * A map used to store the pending changes. */ private SortedMap<ChangeNumber, PendingChange> pendingChanges = new TreeMap<ChangeNumber, PendingChange>(); /** * A sorted set containing the list of PendingChanges that have * not been replayed correctly because they are dependent on * another change to be completed. */ private SortedSet<PendingChange> dependentChanges = new TreeSet<PendingChange>(); /** * The ServerState that will be updated when UpdateMessage are fully replayed. */ private ServerState state; /** * The ChangeNumberGenerator to must be adjusted when new changes * are received from a remote server. */ private ChangeNumberGenerator changeNumberGenerator; /** * Creates a new RemotePendingChanges using the provided ServerState. * * @param changeNumberGenerator The ChangeNumberGenerator that should * be adjusted when changes are received. * @param state The ServerState that will be updated when UpdateMessage * have been fully replayed. */ public RemotePendingChanges(ChangeNumberGenerator changeNumberGenerator, ServerState state) { this.changeNumberGenerator = changeNumberGenerator; this.state = state; } /** * Add a new UpdateMessage that was received from the replication server * to the pendingList. * * @param update The UpdateMessage that was received from the replication * server and that will be added to the pending list. */ public synchronized void putRemoteUpdate(UpdateMessage update) { ChangeNumber changeNumber = update.getChangeNumber(); changeNumberGenerator.adjust(changeNumber); pendingChanges.put(changeNumber, new PendingChange(changeNumber, null, update)); } /** * Mark an update message as committed. * * @param changeNumber The ChangeNumber of the update message that must be * set as committed. */ public synchronized void commit(ChangeNumber changeNumber) { PendingChange curChange = pendingChanges.get(changeNumber); if (curChange == null) { throw new NoSuchElementException(); } curChange.setCommitted(true); ChangeNumber firstChangeNumber = pendingChanges.firstKey(); PendingChange firstChange = pendingChanges.get(firstChangeNumber); while ((firstChange != null) && firstChange.isCommitted()) { state.update(firstChangeNumber); pendingChanges.remove(firstChangeNumber); if (pendingChanges.isEmpty()) { firstChange = null; } else { firstChangeNumber = pendingChanges.firstKey(); firstChange = pendingChanges.get(firstChangeNumber); } } } /** * Get the first update in the list that have some dependencies cleared. * * @return The UpdateMessage to be handled. */ public synchronized UpdateMessage getNextUpdate() { /* * Parse the list of Update with dependencies and check if the dependencies * are now cleared until an Update withour dependencies is found. */ for (PendingChange change : dependentChanges) { if (change.dependenciesIsCovered(state)) { dependentChanges.remove(change); return change.getMsg(); } } return null; } /** * Mark the first pendingChange as dependent on the second PendingChange. * @param dependentChange The PendingChange that depend on the second * PendingChange. * @param pendingChange The PendingChange on which the first PendingChange * is dependent. */ private void addDependency( PendingChange dependentChange, PendingChange pendingChange) { dependentChange.addDependency(pendingChange.getChangeNumber()); dependentChanges.add(dependentChange); } /** * Check if the given AddOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * AddOperation depends on * * - DeleteOperation done on the same DN * - ModifyDnOperation with the same target DN as the ADD DN * - ModifyDnOperation with new DN equals to the ADD DN parent * - AddOperation done on the parent DN of the ADD DN * * @param op The AddOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(AddOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { /* * Check is the operation to be run is a deleteOperation on the * same DN. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * parent of the current AddOperation. */ if (pendingChange.getTargetDN().isAncestorOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { /* * Check if the operation to be run is ModifyDnOperation with * the same target DN as the ADD DN * or a ModifyDnOperation with new DN equals to the ADD DN parent */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } else { ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg(); if (pendingModDn.newDNIsParent(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } } return hasDependencies; } /** * Check if the given ModifyOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * ModifyOperation depends on * - AddOperation done on the same DN * * @param op The ModifyOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(ModifyOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * same DN. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } /** * Check if the given ModifyDNMsg has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * Modify DN Operation depends on * - AddOperation done on the same DN as the target DN of the MODDN operation * - AddOperation done on the new parent of the MODDN operation * - DeleteOperation done on the new DN of the MODDN operation * - ModifyDNOperation done from the new DN of the MODDN operation * * @param msg The ModifyDNMsg to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(ModifyDNMsg msg) { boolean hasDependencies = false; ChangeNumber changeNumber = msg.getChangeNumber(); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; DN targetDn = change.getTargetDN(); for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { // Check if the target of the Delete is the same // as the new DN of this ModifyDN if (msg.newDNIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { // Check if the Add Operation was done on the new parent of // the MODDN operation if (msg.newParentIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } // Check if the AddOperation was done on the same DN as the // target DN of the MODDN operation if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { // Check if the ModifyDNOperation was done from the new DN of // the MODDN operation if (msg.newDNIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } /** * Check if the given DeleteOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * DeleteOperation depends on * - DeleteOperation done on children DN * - ModifyDnOperation with target DN that are children of the DEL DN * - AddOperation done on the same DN * * * @param op The DeleteOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(DeleteOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { /* * Check if the operation to be run is a deleteOperation on a * children of the current DeleteOperation. */ if (pendingChange.getTargetDN().isDescendantOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * parent of the current DeleteOperation. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { /* * Check if the operation to be run is an ModifyDNOperation * on a children of the current DeleteOperation */ if (pendingChange.getTargetDN().isDescendantOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } } opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -170,17 +170,22 @@ /** * This object is used to store the list of update currently being * done on the local database. * It contain both the update that are done directly on this server * and the updates that was done on another server, transmitted * by the replication server and that are currently replayed. * It is usefull to make sure that dependencies between operations * are correctly fullfilled, that the local operations are sent in a * Is is usefull to make sure that the local operations are sent in a * correct order to the replication server and that the ServerState * is not updated too early. */ private PendingChanges pendingChanges; /** * It contain the updates that were done on other servers, transmitted * by the replication server and that are currently replayed. * It is usefull to make sure that dependencies between operations * are correctly fullfilled and to to make sure that the ServerState is * not updated too early. */ private RemotePendingChanges remotePendingChanges; /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. */ @@ -373,10 +378,15 @@ * ChangeNumberGenerator is used to create new unique ChangeNumbers * for each operation done on the replication domain. */ ChangeNumberGenerator generator = new ChangeNumberGenerator(serverId, state); pendingChanges = new PendingChanges(new ChangeNumberGenerator(serverId, state), broker, state); remotePendingChanges = new RemotePendingChanges(generator, state); // listen for changes on the configuration configuration.addChangeListener(this); } @@ -679,7 +689,7 @@ */ public UpdateMessage receive() { UpdateMessage update = pendingChanges.getNextUpdate(); UpdateMessage update = remotePendingChanges.getNextUpdate(); if (update == null) { @@ -774,7 +784,7 @@ */ public void receiveUpdate(UpdateMessage update) { pendingChanges.putRemoteUpdate(update); remotePendingChanges.putRemoteUpdate(update); numRcvdUpdates.incrementAndGet(); } @@ -847,7 +857,14 @@ { try { pendingChanges.commit(curChangeNumber, op, msg); if (op.isSynchronizationOperation()) { remotePendingChanges.commit(curChangeNumber); } else { pendingChanges.commit(curChangeNumber, msg); } } catch (NoSuchElementException e) { @@ -880,8 +897,11 @@ } } int pushedChanges = pendingChanges.pushCommittedChanges(); numSentUpdates.addAndGet(pushedChanges); if (!op.isSynchronizationOperation()) { int pushedChanges = pendingChanges.pushCommittedChanges(); numSentUpdates.addAndGet(pushedChanges); } // Wait for acknowledgement of an assured message. if (msg != null && isAssured) @@ -1111,7 +1131,7 @@ if (op instanceof ModifyOperation) { ModifyOperation newOp = (ModifyOperation) op; dependency = pendingChanges.checkDependencies(newOp); dependency = remotePendingChanges.checkDependencies(newOp); if (!dependency) { done = solveNamingConflict(newOp, msg); @@ -1120,7 +1140,7 @@ else if (op instanceof DeleteOperation) { DeleteOperation newOp = (DeleteOperation) op; dependency = pendingChanges.checkDependencies(newOp); dependency = remotePendingChanges.checkDependencies(newOp); if ((!dependency) && (!firstTry)) { done = solveNamingConflict(newOp, msg); @@ -1130,7 +1150,7 @@ { AddOperation newOp = (AddOperation) op; AddMsg addMsg = (AddMsg) msg; dependency = pendingChanges.checkDependencies(newOp); dependency = remotePendingChanges.checkDependencies(newOp); if (!dependency) { done = solveNamingConflict(newOp, addMsg); @@ -1139,7 +1159,7 @@ else if (op instanceof ModifyDNOperation) { ModifyDNMsg newMsg = (ModifyDNMsg) msg; dependency = pendingChanges.checkDependencies(newMsg); dependency = remotePendingChanges.checkDependencies(newMsg); if (!dependency) { ModifyDNOperation newOp = (ModifyDNOperation) op; @@ -1253,9 +1273,7 @@ */ public void updateError(ChangeNumber changeNumber) { pendingChanges.commit(changeNumber); int pushedChanges = pendingChanges.pushCommittedChanges(); numSentUpdates.addAndGet(pushedChanges); remotePendingChanges.commit(changeNumber); } /**