mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,28 +27,19 @@
 */
package org.opends.server.replication.plugin;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.*;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DN;
import org.opends.server.types.Operation;
/**
 *
 * This class is used to store the list of remote changes received
 * from a replication server and that are either currently being replayed
 * or that are waiting for being replayed.
@@ -57,15 +48,14 @@
 * the dependencies between operations.
 *
 * One of this object is instantiated for each ReplicationDomain.
 *
 */
public final class RemotePendingChanges
{
  /**
   * A map used to store the pending changes.
   */
  private SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  private SortedMap<CSN, PendingChange> pendingChanges =
    new TreeMap<CSN, PendingChange>();
  /**
   * A sorted set containing the list of PendingChanges that have
@@ -110,33 +100,32 @@
   */
  public synchronized void putRemoteUpdate(LDAPUpdateMsg update)
  {
    ChangeNumber changeNumber = update.getChangeNumber();
    pendingChanges.put(changeNumber, new PendingChange(changeNumber, null,
                                                        update));
    CSN csn = update.getCSN();
    pendingChanges.put(csn, new PendingChange(csn, null, update));
  }
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param csn
   *          The CSN of the update message that must be set as committed.
   */
  public synchronized void commit(ChangeNumber changeNumber)
  public synchronized void commit(CSN csn)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    PendingChange curChange = pendingChanges.get(csn);
    if (curChange == null)
    {
      throw new NoSuchElementException();
    }
    curChange.setCommitted(true);
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      state.update(firstCSN);
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
      {
@@ -144,8 +133,8 @@
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
        firstCSN = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstCSN);
      }
    }
  }
@@ -182,7 +171,7 @@
  private void addDependency(
      PendingChange dependentChange, PendingChange pendingChange)
  {
    dependentChange.addDependency(pendingChange.getChangeNumber());
    dependentChange.addDependency(pendingChange.getCSN());
    dependentChanges.add(dependentChange);
  }
@@ -206,14 +195,14 @@
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    CSN csn = OperationContext.getCSN(op);
    PendingChange change = pendingChanges.get(csn);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      if (pendingChange.getCSN().older(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
@@ -296,14 +285,14 @@
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    CSN csn = OperationContext.getCSN(op);
    PendingChange change = pendingChanges.get(csn);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      if (pendingChange.getCSN().older(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
@@ -354,8 +343,8 @@
  public synchronized boolean checkDependencies(ModifyDNMsg msg)
  {
    boolean hasDependencies = false;
    ChangeNumber changeNumber = msg.getChangeNumber();
    PendingChange change = pendingChanges.get(changeNumber);
    CSN csn = msg.getCSN();
    PendingChange change = pendingChanges.get(csn);
    if (change == null)
      return false;
@@ -364,7 +353,7 @@
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      if (pendingChange.getCSN().older(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
@@ -441,14 +430,14 @@
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    CSN csn = OperationContext.getCSN(op);
    PendingChange change = pendingChanges.get(csn);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      if (pendingChange.getCSN().older(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)