| | |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | |
| | | import net.jcip.annotations.GuardedBy; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | |
| | | final class RemotePendingChanges |
| | | { |
| | | /** A map used to store the pending changes. */ |
| | | @GuardedBy("pendingChangesLock") |
| | | private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>(); |
| | | |
| | | /** |
| | |
| | | * not been replayed correctly because they are dependent on |
| | | * another change to be completed. |
| | | */ |
| | | @GuardedBy("dependentChangesLock") |
| | | private final SortedSet<PendingChange> dependentChanges = new TreeSet<>(); |
| | | /** |
| | | * {@code activeAndDependentChanges} also contains changes discovered to be dependent |
| | | * on currently in progress changes. |
| | | */ |
| | | private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>(); |
| | | |
| | | private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true); |
| | | private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock(); |
| | | private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock(); |
| | | private ReentrantLock dependentChangesLock = new ReentrantLock(); |
| | | |
| | | /** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */ |
| | | private final ServerState state; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of changes currently in this list. |
| | | * Returns the number of changes waiting to be replayed. |
| | | * |
| | | * @return The number of changes currently in this list. |
| | | * @return The number of changes waiting to be replayed |
| | | */ |
| | | public synchronized int getQueueSize() |
| | | public int getQueueSize() |
| | | { |
| | | pendingChangesReadLock.lock(); |
| | | try |
| | | { |
| | | return pendingChanges.size(); |
| | | } |
| | | finally |
| | | { |
| | | pendingChangesReadLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of changes actively being replayed. |
| | | * |
| | | * @return the number of changes actively being replayed. |
| | | */ |
| | | public int changesInProgressSize() |
| | | { |
| | | return activeAndDependentChanges.size(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of changes depending on other changes. |
| | | * |
| | | * @return the number of changes depending on other changes. |
| | | */ |
| | | public int getDependentChangesSize() |
| | | { |
| | | dependentChangesLock.lock(); |
| | | try |
| | | { |
| | | return dependentChanges.size(); |
| | | } |
| | | finally |
| | | { |
| | | dependentChangesLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add a new LDAPUpdateMsg that was received from the replication server |
| | |
| | | * @return {@code false} if the update was already registered in the pending |
| | | * changes. |
| | | */ |
| | | public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update) |
| | | public boolean putRemoteUpdate(LDAPUpdateMsg update) |
| | | { |
| | | pendingChangesWriteLock.lock(); |
| | | try |
| | | { |
| | | CSN csn = update.getCSN(); |
| | | return pendingChanges.put(csn, |
| | | new PendingChange(csn, null, update)) == null; |
| | | return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null; |
| | | } |
| | | finally |
| | | { |
| | | pendingChangesWriteLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param csn |
| | | * The CSN of the update message that must be set as committed. |
| | | */ |
| | | public synchronized void commit(CSN csn) |
| | | public void commit(CSN csn) |
| | | { |
| | | pendingChangesWriteLock.lock(); |
| | | try |
| | | { |
| | | PendingChange curChange = pendingChanges.get(csn); |
| | | if (curChange == null) |
| | |
| | | throw new NoSuchElementException(); |
| | | } |
| | | curChange.setCommitted(true); |
| | | activeAndDependentChanges.remove(curChange); |
| | | |
| | | CSN firstCSN = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstCSN); |
| | | |
| | | while (firstChange != null && firstChange.isCommitted()) |
| | | Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | if (firstChange.getMsg().contributesToDomainState()) |
| | | Map.Entry<CSN, PendingChange> change = it.next(); |
| | | PendingChange pendingChange = change.getValue(); |
| | | if (!pendingChange.isCommitted()) |
| | | { |
| | | state.update(firstCSN); |
| | | break; |
| | | } |
| | | pendingChanges.remove(firstCSN); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | if (pendingChange.getMsg().contributesToDomainState()) |
| | | { |
| | | firstChange = null; |
| | | state.update(change.getKey()); |
| | | } |
| | | else |
| | | it.remove(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | firstCSN = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstCSN); |
| | | } |
| | | pendingChangesWriteLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | public void markInProgress(LDAPUpdateMsg msg) |
| | | { |
| | | pendingChangesReadLock.lock(); |
| | | try |
| | | { |
| | | activeAndDependentChanges.add(pendingChanges.get(msg.getCSN())); |
| | | } |
| | | finally |
| | | { |
| | | pendingChangesReadLock.unlock(); |
| | | } |
| | | } |
| | | /** |
| | | * Get the first update in the list that have some dependencies cleared. |
| | | * |
| | | * @return The LDAPUpdateMsg to be handled. |
| | | */ |
| | | public synchronized LDAPUpdateMsg getNextUpdate() |
| | | public LDAPUpdateMsg getNextUpdate() |
| | | { |
| | | /* |
| | | * Parse the list of Update with dependencies and check if the dependencies |
| | | * are now cleared until an Update without dependencies is found. |
| | | */ |
| | | for (PendingChange change : dependentChanges) |
| | | pendingChangesReadLock.lock(); |
| | | dependentChangesLock.lock(); |
| | | try |
| | | { |
| | | if (change.dependenciesIsCovered(state)) |
| | | if (!dependentChanges.isEmpty()) |
| | | { |
| | | dependentChanges.remove(change); |
| | | return change.getLDAPUpdateMsg(); |
| | | PendingChange firstDependentChange = dependentChanges.first(); |
| | | if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN())) |
| | | { |
| | | dependentChanges.remove(firstDependentChange); |
| | | return firstDependentChange.getLDAPUpdateMsg(); |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | dependentChangesLock.unlock(); |
| | | pendingChangesReadLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Mark the first pendingChange as dependent on the second PendingChange. |
| | | * @param dependentChange The PendingChange that depend on the second |
| | | * PendingChange. |
| | | * @param pendingChange The PendingChange on which the first PendingChange |
| | | * is dependent. |
| | | */ |
| | | private void addDependency( |
| | | PendingChange dependentChange, PendingChange pendingChange) |
| | | private void addDependency(PendingChange dependentChange) |
| | | { |
| | | dependentChange.addDependency(pendingChange.getCSN()); |
| | | dependentChangesLock.lock(); |
| | | try |
| | | { |
| | | dependentChanges.add(dependentChange); |
| | | } |
| | | finally |
| | | { |
| | | dependentChangesLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private PendingChange getPendingChange(CSN csn) |
| | | { |
| | | pendingChangesReadLock.lock(); |
| | | try |
| | | { |
| | | return pendingChanges.get(csn); |
| | | } |
| | | finally |
| | | { |
| | | pendingChangesReadLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if the given AddOperation has some dependencies on any |
| | |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(AddOperation op) |
| | | public boolean checkDependencies(AddOperation op) |
| | | { |
| | | boolean hasDependencies = false; |
| | | final DN targetDN = op.getEntryDN(); |
| | | final CSN csn = OperationContext.getCSN(op); |
| | | final PendingChange change = pendingChanges.get(csn); |
| | | final PendingChange change = getPendingChange(csn); |
| | | |
| | | if (change == null) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | for (PendingChange pendingChange : activeAndDependentChanges) |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | if (pendingChange.getCSN().isNewerThanOrEqualTo(csn)) |
| | | { |
| | | // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now. |
| | | break; |
| | | } |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | | { |
| | | /* |
| | | * Check is the operation to be run is a deleteOperation on the |
| | | * same DN. |
| | | * Check if the operation to be run is a deleteOperation on the same DN. |
| | | */ |
| | | if (pendingMsg.getDN().equals(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof AddMsg) |
| | |
| | | if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | |
| | | if (pendingMsg.getDN().equals(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | else |
| | | { |
| | |
| | | if (pendingModDn.newDNIsParent(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // We reached an operation that is newer than the operation |
| | | // for which we are doing the dependency check so it is |
| | | // not possible to find another operation with some dependency. |
| | | // break the loop to avoid going through the potentially large |
| | | // list of pending changes. |
| | | break; |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | |
| | |
| | | * |
| | | * ModifyOperation depends on |
| | | * - AddOperation done on the same DN |
| | | * - ModifyDNOperation having newDN the same as targetDN |
| | | * |
| | | * @param op The ModifyOperation to be checked. |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(ModifyOperation op) |
| | | public boolean checkDependencies(ModifyOperation op) |
| | | { |
| | | boolean hasDependencies = false; |
| | | final DN targetDN = op.getEntryDN(); |
| | | final CSN csn = OperationContext.getCSN(op); |
| | | final PendingChange change = pendingChanges.get(csn); |
| | | final PendingChange change = getPendingChange(csn); |
| | | |
| | | if (change == null) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | final DN targetDN = change.getLDAPUpdateMsg().getDN(); |
| | | for (PendingChange pendingChange : activeAndDependentChanges) |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | if (pendingChange.getCSN().isNewerThanOrEqualTo(csn)) |
| | | { |
| | | // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now. |
| | | break; |
| | | } |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg instanceof AddMsg) |
| | | { |
| | |
| | | if (pendingMsg.getDN().equals(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | | { |
| | | // We reached an operation that is newer than the operation |
| | | // for which we are doing the dependency check so it is |
| | | // not possible to find another operation with some dependency. |
| | | // break the loop to avoid going through the potentially large |
| | | // list of pending changes. |
| | | break; |
| | | if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change); |
| | | } |
| | | } |
| | | } |
| | | return hasDependencies; |
| | |
| | | * - DeleteOperation done on the new DN of the MODDN operation |
| | | * - ModifyDNOperation done from the new DN of the MODDN operation |
| | | * |
| | | * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first |
| | | * |
| | | * @param msg The ModifyDNMsg to be checked. |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | private synchronized boolean checkDependencies(ModifyDNMsg msg) |
| | | public boolean checkDependencies(ModifyDNMsg msg) |
| | | { |
| | | boolean hasDependencies = false; |
| | | final CSN csn = msg.getCSN(); |
| | | final PendingChange change = pendingChanges.get(csn); |
| | | final PendingChange change = getPendingChange(csn); |
| | | |
| | | if (change == null) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | final DN targetDN = change.getLDAPUpdateMsg().getDN(); |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | for (PendingChange pendingChange : activeAndDependentChanges) |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | if (pendingChange.getCSN().isNewerThanOrEqualTo(csn)) |
| | | { |
| | | // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now. |
| | | break; |
| | | } |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | | { |
| | | // Check if the target of the Delete is the same |
| | |
| | | if (msg.newDNIsEqual(pendingMsg.getDN())) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof AddMsg) |
| | |
| | | if (msg.newParentIsEqual(pendingMsg.getDN())) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | // Check if the AddOperation was done on the same DN as the |
| | | // target DN of the MODDN operation |
| | | if (pendingMsg.getDN().equals(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | |
| | | if (msg.newDNIsEqual(pendingMsg.getDN())) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // We reached an operation that is newer than the operation |
| | | // for which we are doing the dependency check so it is |
| | | // not possible to find another operation with some dependency. |
| | | // break the loop to avoid going through the potentially large |
| | | // list of pending changes. |
| | | break; |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | |
| | |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(DeleteOperation op) |
| | | public boolean checkDependencies(DeleteOperation op) |
| | | { |
| | | boolean hasDependencies = false; |
| | | final DN targetDN = op.getEntryDN(); |
| | | final CSN csn = OperationContext.getCSN(op); |
| | | final PendingChange change = pendingChanges.get(csn); |
| | | final PendingChange change = getPendingChange(csn); |
| | | |
| | | if (change == null) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | for (PendingChange pendingChange : activeAndDependentChanges) |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | if (pendingChange.getCSN().isNewerThanOrEqualTo(csn)) |
| | | { |
| | | // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now. |
| | | break; |
| | | } |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | | { |
| | | /* |
| | |
| | | if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof AddMsg) |
| | |
| | | if (pendingMsg.getDN().equals(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | |
| | | * Check if the operation to be run is an ModifyDNOperation |
| | | * on a children of the current DeleteOperation |
| | | */ |
| | | if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) |
| | | || pendingModDn.newDNIsParent(targetDN)) |
| | | if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | addDependency(change); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // We reached an operation that is newer than the operation |
| | | // for which we are doing the dependency check so it is |
| | | // not possible to find another operation with some dependency. |
| | | // break the loop to avoid going through the potentially large |
| | | // list of pending changes. |
| | | break; |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | |
| | |
| | | { |
| | | DeleteOperation newOp = (DeleteOperation) op; |
| | | return checkDependencies(newOp); |
| | | |
| | | } else if (op instanceof AddOperation) |
| | | } |
| | | else if (op instanceof AddOperation) |
| | | { |
| | | AddOperation newOp = (AddOperation) op; |
| | | return checkDependencies(newOp); |
| | | } else if (op instanceof ModifyDNOperationBasis) |
| | | } |
| | | else if (op instanceof ModifyDNOperationBasis) |
| | | { |
| | | ModifyDNMsg newMsg = (ModifyDNMsg) msg; |
| | | return checkDependencies(newMsg); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | return true; // unknown type of operation ?! |
| | | } |