mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Fabio Pistolesi
11.39.2016 3c140af6c756b30b325ce3c6ed080e8898e2b7ec
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,7 +27,11 @@
package org.opends.server.replication.plugin;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.jcip.annotations.GuardedBy;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -51,6 +55,7 @@
final class RemotePendingChanges
{
  /** A map used to store the pending changes. */
  @GuardedBy("pendingChangesLock")
  private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>();
  /**
@@ -58,7 +63,18 @@
   * not been replayed correctly because they are dependent on
   * another change to be completed.
   */
  @GuardedBy("dependentChangesLock")
  private final SortedSet<PendingChange> dependentChanges = new TreeSet<>();
  /**
   * {@code activeAndDependentChanges} also contains changes discovered to be dependent
   * on currently in progress changes.
   */
  private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>();
  private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true);
  private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock();
  private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock();
  private ReentrantLock dependentChangesLock = new ReentrantLock();
  /** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */
  private final ServerState state;
@@ -75,13 +91,49 @@
  }
  /**
   * Returns the number of changes currently in this list.
   * Returns the number of changes waiting to be replayed.
   *
   * @return The number of changes currently in this list.
   * @return The number of changes waiting to be replayed
   */
  public synchronized int getQueueSize()
  public int getQueueSize()
  {
    return pendingChanges.size();
    pendingChangesReadLock.lock();
    try
    {
      return pendingChanges.size();
    }
    finally
    {
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Returns the number of changes actively being replayed.
   *
   * @return the number of changes actively being replayed.
   */
  public int changesInProgressSize()
  {
    return activeAndDependentChanges.size();
  }
  /**
   * Returns the number of changes depending on other changes.
   *
   * @return the number of changes depending on other changes.
   */
  public int getDependentChangesSize()
  {
    dependentChangesLock.lock();
    try
    {
      return dependentChanges.size();
    }
    finally
    {
      dependentChangesLock.unlock();
    }
  }
  /**
@@ -93,11 +145,18 @@
   * @return {@code false} if the update was already registered in the pending
   *         changes.
   */
  public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update)
  public boolean putRemoteUpdate(LDAPUpdateMsg update)
  {
    CSN csn = update.getCSN();
    return pendingChanges.put(csn,
        new PendingChange(csn, null, update)) == null;
    pendingChangesWriteLock.lock();
    try
    {
      CSN csn = update.getCSN();
      return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null;
    }
    finally
    {
      pendingChangesWriteLock.unlock();
    }
  }
  /**
@@ -106,72 +165,111 @@
   * @param csn
   *          The CSN of the update message that must be set as committed.
   */
  public synchronized void commit(CSN csn)
  public void commit(CSN csn)
  {
    PendingChange curChange = pendingChanges.get(csn);
    if (curChange == null)
    pendingChangesWriteLock.lock();
    try
    {
      throw new NoSuchElementException();
      PendingChange curChange = pendingChanges.get(csn);
      if (curChange == null)
      {
        throw new NoSuchElementException();
      }
      curChange.setCommitted(true);
      activeAndDependentChanges.remove(curChange);
      Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator();
      while (it.hasNext())
      {
        Map.Entry<CSN, PendingChange> change = it.next();
        PendingChange pendingChange = change.getValue();
        if (!pendingChange.isCommitted())
        {
          break;
        }
        if (pendingChange.getMsg().contributesToDomainState())
        {
          state.update(change.getKey());
        }
        it.remove();
      }
    }
    curChange.setCommitted(true);
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    while (firstChange != null && firstChange.isCommitted())
    finally
    {
      if (firstChange.getMsg().contributesToDomainState())
      {
        state.update(firstCSN);
      }
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstCSN = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstCSN);
      }
      pendingChangesWriteLock.unlock();
    }
  }
  public void markInProgress(LDAPUpdateMsg msg)
  {
    pendingChangesReadLock.lock();
    try
    {
      activeAndDependentChanges.add(pendingChanges.get(msg.getCSN()));
    }
    finally
    {
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Get the first update in the list that have some dependencies cleared.
   *
   * @return The LDAPUpdateMsg to be handled.
   */
  public synchronized LDAPUpdateMsg getNextUpdate()
  public LDAPUpdateMsg getNextUpdate()
  {
    /*
     * Parse the list of Update with dependencies and check if the dependencies
     * are now cleared until an Update without dependencies is found.
     */
    for (PendingChange change : dependentChanges)
    pendingChangesReadLock.lock();
    dependentChangesLock.lock();
    try
    {
      if (change.dependenciesIsCovered(state))
      if (!dependentChanges.isEmpty())
      {
        dependentChanges.remove(change);
        return change.getLDAPUpdateMsg();
        PendingChange firstDependentChange = dependentChanges.first();
        if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN()))
        {
          dependentChanges.remove(firstDependentChange);
          return firstDependentChange.getLDAPUpdateMsg();
        }
      }
      return null;
    }
    return null;
    finally
    {
      dependentChangesLock.unlock();
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Mark the first pendingChange as dependent on the second PendingChange.
   * @param dependentChange The PendingChange that depend on the second
   *                        PendingChange.
   * @param pendingChange   The PendingChange on which the first PendingChange
   *                        is dependent.
   */
  private void addDependency(
      PendingChange dependentChange, PendingChange pendingChange)
  private void addDependency(PendingChange dependentChange)
  {
    dependentChange.addDependency(pendingChange.getCSN());
    dependentChanges.add(dependentChange);
    dependentChangesLock.lock();
    try
    {
      dependentChanges.add(dependentChange);
    }
    finally
    {
      dependentChangesLock.unlock();
    }
  }
  private PendingChange getPendingChange(CSN csn)
  {
    pendingChangesReadLock.lock();
    try
    {
      return pendingChanges.get(csn);
    }
    finally
    {
      pendingChangesReadLock.unlock();
    }
  }
  /**
@@ -190,80 +288,70 @@
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(AddOperation op)
  public boolean checkDependencies(AddOperation op)
  {
    boolean hasDependencies = false;
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
      if (pendingMsg instanceof DeleteMsg)
      {
          /*
           * Check if the operation to be run is a deleteOperation on the same DN.
           */
        if (pendingMsg.getDN().equals(targetDN))
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check is the operation to be run is a deleteOperation on the
             * same DN.
             */
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * parent of the current AddOperation.
             */
            if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            /*
             * Check if the operation to be run is ModifyDnOperation with
             * the same target DN as the ADD DN
             * or a ModifyDnOperation with new DN equals to the ADD DN parent
             */
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            else
            {
              final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
              if (pendingModDn.newDNIsParent(targetDN))
              {
                hasDependencies = true;
                addDependency(change, pendingChange);
              }
            }
          }
          hasDependencies = true;
          addDependency(change);
        }
      }
      else
      else if (pendingMsg instanceof AddMsg)
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
          /*
           * Check if the operation to be run is an addOperation on a
           * parent of the current AddOperation.
           */
        if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
      else if (pendingMsg instanceof ModifyDNMsg)
      {
          /*
           * Check if the operation to be run is ModifyDnOperation with
           * the same target DN as the ADD DN
           * or a ModifyDnOperation with new DN equals to the ADD DN parent
           */
        if (pendingMsg.getDN().equals(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
        else
        {
          final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
          if (pendingModDn.newDNIsParent(targetDN))
          {
            hasDependencies = true;
            addDependency(change);
          }
        }
      }
    }
    return hasDependencies;
@@ -277,45 +365,48 @@
   *
   * ModifyOperation depends on
   * - AddOperation done on the same DN
   * - ModifyDNOperation having newDN the same as targetDN
   *
   * @param op The ModifyOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyOperation op)
  public boolean checkDependencies(ModifyOperation op)
  {
    boolean hasDependencies = false;
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
        return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    final DN targetDN = change.getLDAPUpdateMsg().getDN();
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg instanceof AddMsg)
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
      if (pendingMsg instanceof AddMsg)
      {
        // Check if the operation to be run is an addOperation on a same DN.
        if (pendingMsg.getDN().equals(targetDN))
        {
          // Check if the operation to be run is an addOperation on a same DN.
          if (pendingMsg.getDN().equals(targetDN))
          {
            hasDependencies = true;
            addDependency(change, pendingChange);
          }
          hasDependencies = true;
          addDependency(change);
        }
      }
      else
      else if (pendingMsg instanceof ModifyDNMsg)
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
        if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
    }
    return hasDependencies;
@@ -333,76 +424,68 @@
   * - DeleteOperation done on the new DN of the MODDN operation
   * - ModifyDNOperation done from the new DN of the MODDN operation
   *
   * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first
   *
   * @param msg The ModifyDNMsg to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  private synchronized boolean checkDependencies(ModifyDNMsg msg)
  public boolean checkDependencies(ModifyDNMsg msg)
  {
    boolean hasDependencies = false;
    final CSN csn = msg.getCSN();
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    final DN targetDN = change.getLDAPUpdateMsg().getDN();
    for (PendingChange pendingChange : pendingChanges.values())
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
      if (pendingMsg instanceof DeleteMsg)
      {
        // Check if the target of the Delete is the same
        // as the new DN of this ModifyDN
        if (msg.newDNIsEqual(pendingMsg.getDN()))
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            // Check if the target of the Delete is the same
            // as the new DN of this ModifyDN
            if (msg.newDNIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            // Check if the Add Operation was done on the new parent of
            // the MODDN  operation
            if (msg.newParentIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            // Check if the AddOperation was done on the same DN as the
            // target DN of the MODDN operation
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            // Check if the ModifyDNOperation was done from the new DN of
            // the MODDN operation
            if (msg.newDNIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          hasDependencies = true;
          addDependency(change);
        }
      }
      else
      else if (pendingMsg instanceof AddMsg)
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
        // Check if the Add Operation was done on the new parent of
        // the MODDN  operation
        if (msg.newParentIsEqual(pendingMsg.getDN()))
        {
          hasDependencies = true;
          addDependency(change);
        }
        // Check if the AddOperation was done on the same DN as the
        // target DN of the MODDN operation
        if (pendingMsg.getDN().equals(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
      else if (pendingMsg instanceof ModifyDNMsg)
      {
        // Check if the ModifyDNOperation was done from the new DN of
        // the MODDN operation
        if (msg.newDNIsEqual(pendingMsg.getDN()))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
    }
    return hasDependencies;
@@ -424,72 +507,62 @@
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(DeleteOperation op)
  public boolean checkDependencies(DeleteOperation op)
  {
    boolean hasDependencies = false;
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
      if (pendingMsg instanceof DeleteMsg)
      {
          /*
           * Check if the operation to be run is a deleteOperation on a
           * children of the current DeleteOperation.
           */
        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check if the operation to be run is a deleteOperation on a
             * children of the current DeleteOperation.
             */
            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * parent of the current DeleteOperation.
             */
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
            /*
             * Check if the operation to be run is an ModifyDNOperation
             * on a children of the current DeleteOperation
             */
            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)
                || pendingModDn.newDNIsParent(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          hasDependencies = true;
          addDependency(change);
        }
      }
      else
      else if (pendingMsg instanceof AddMsg)
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
          /*
           * Check if the operation to be run is an addOperation on a
           * parent of the current DeleteOperation.
           */
        if (pendingMsg.getDN().equals(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
      else if (pendingMsg instanceof ModifyDNMsg)
      {
        final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
          /*
           * Check if the operation to be run is an ModifyDNOperation
           * on a children of the current DeleteOperation
           */
        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
    }
    return hasDependencies;
@@ -514,16 +587,18 @@
    {
      DeleteOperation newOp = (DeleteOperation) op;
      return checkDependencies(newOp);
    } else if (op instanceof AddOperation)
    }
    else if (op instanceof AddOperation)
    {
      AddOperation newOp = (AddOperation) op;
      return checkDependencies(newOp);
    } else if (op instanceof ModifyDNOperationBasis)
    }
    else if (op instanceof ModifyDNOperationBasis)
    {
      ModifyDNMsg newMsg = (ModifyDNMsg) msg;
      return checkDependencies(newMsg);
    } else
    }
    else
    {
      return true;  // unknown type of operation ?!
    }