| | |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeMap; |
| | | import java.util.TreeSet; |
| | | import java.util.*; |
| | | |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.OperationContext; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Operation; |
| | | |
| | | /** |
| | | * |
| | | * This class is used to store the list of remote changes received |
| | | * from a replication server and that are either currently being replayed |
| | | * or that are waiting for being replayed. |
| | |
| | | * the dependencies between operations. |
| | | * |
| | | * One of this object is instantiated for each ReplicationDomain. |
| | | * |
| | | */ |
| | | public final class RemotePendingChanges |
| | | { |
| | | /** |
| | | * A map used to store the pending changes. |
| | | */ |
| | | private SortedMap<ChangeNumber, PendingChange> pendingChanges = |
| | | new TreeMap<ChangeNumber, PendingChange>(); |
| | | private SortedMap<CSN, PendingChange> pendingChanges = |
| | | new TreeMap<CSN, PendingChange>(); |
| | | |
| | | /** |
| | | * A sorted set containing the list of PendingChanges that have |
| | |
| | | */ |
| | | public synchronized void putRemoteUpdate(LDAPUpdateMsg update) |
| | | { |
| | | ChangeNumber changeNumber = update.getChangeNumber(); |
| | | pendingChanges.put(changeNumber, new PendingChange(changeNumber, null, |
| | | update)); |
| | | CSN csn = update.getCSN(); |
| | | pendingChanges.put(csn, new PendingChange(csn, null, update)); |
| | | } |
| | | |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param csn |
| | | * The CSN of the update message that must be set as committed. |
| | | */ |
| | | public synchronized void commit(ChangeNumber changeNumber) |
| | | public synchronized void commit(CSN csn) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | PendingChange curChange = pendingChanges.get(csn); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | curChange.setCommitted(true); |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | CSN firstCSN = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstCSN); |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | state.update(firstChangeNumber); |
| | | pendingChanges.remove(firstChangeNumber); |
| | | state.update(firstCSN); |
| | | pendingChanges.remove(firstCSN); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | firstCSN = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstCSN); |
| | | } |
| | | } |
| | | } |
| | |
| | | private void addDependency( |
| | | PendingChange dependentChange, PendingChange pendingChange) |
| | | { |
| | | dependentChange.addDependency(pendingChange.getChangeNumber()); |
| | | dependentChange.addDependency(pendingChange.getCSN()); |
| | | dependentChanges.add(dependentChange); |
| | | } |
| | | |
| | |
| | | { |
| | | boolean hasDependencies = false; |
| | | DN targetDn = op.getEntryDN(); |
| | | ChangeNumber changeNumber = OperationContext.getChangeNumber(op); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | CSN csn = OperationContext.getCSN(op); |
| | | PendingChange change = pendingChanges.get(csn); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | if (pendingChange.getCSN().older(csn)) |
| | | { |
| | | LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | |
| | | { |
| | | boolean hasDependencies = false; |
| | | DN targetDn = op.getEntryDN(); |
| | | ChangeNumber changeNumber = OperationContext.getChangeNumber(op); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | CSN csn = OperationContext.getCSN(op); |
| | | PendingChange change = pendingChanges.get(csn); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | if (pendingChange.getCSN().older(csn)) |
| | | { |
| | | LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | |
| | | public synchronized boolean checkDependencies(ModifyDNMsg msg) |
| | | { |
| | | boolean hasDependencies = false; |
| | | ChangeNumber changeNumber = msg.getChangeNumber(); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | CSN csn = msg.getCSN(); |
| | | PendingChange change = pendingChanges.get(csn); |
| | | if (change == null) |
| | | return false; |
| | | |
| | |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | if (pendingChange.getCSN().older(csn)) |
| | | { |
| | | LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | |
| | | { |
| | | boolean hasDependencies = false; |
| | | DN targetDn = op.getEntryDN(); |
| | | ChangeNumber changeNumber = OperationContext.getChangeNumber(op); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | CSN csn = OperationContext.getCSN(op); |
| | | PendingChange change = pendingChanges.get(csn); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | if (pendingChange.getCSN().older(csn)) |
| | | { |
| | | LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |