mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -31,8 +31,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.operation.PluginOperation;
@@ -42,7 +42,7 @@
 * in progress and not yet committed in the database.
 *
 * It is used to make sure that operations are sent to the Replication
 * Server in the order defined by their ChangeNumber.
 * Server in the order defined by their CSN.
 * It is also used to update the ServerState at the appropriate time.
 *
 * On object of this class is instantiated for each ReplicationDomain.
@@ -52,14 +52,14 @@
  /**
   * A map used to store the pending changes.
   */
  private SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  private SortedMap<CSN, PendingChange> pendingChanges =
    new TreeMap<CSN, PendingChange>();
  /**
   * The ChangeNumberGenerator to use to create new unique ChangeNumbers
   * The {@link CSNGenerator} to use to create new unique CSNs
   * for each operation done on the replication domain.
   */
  private ChangeNumberGenerator changeNumberGenerator;
  private CSNGenerator csnGenerator;
  /**
   * The ReplicationDomain that will be used to send UpdateMsg.
@@ -69,30 +69,28 @@
  private boolean recoveringOldChanges = false;
  /**
   * Creates a new PendingChanges using the provided ChangeNumberGenerator.
   * Creates a new PendingChanges using the provided CSNGenerator.
   *
   * @param changeNumberGenerator The ChangeNumberGenerator to use to create
   *                               new unique ChangeNumbers.
   * @param domain  The ReplicationDomain that will be used to send
   *                UpdateMsg.
   * @param csnGenerator The CSNGenerator to use to create new unique CSNs.
   * @param domain  The ReplicationDomain that will be used to send UpdateMsg.
   */
  public PendingChanges(
      ChangeNumberGenerator changeNumberGenerator, ReplicationDomain domain)
      CSNGenerator csnGenerator, ReplicationDomain domain)
  {
    this.changeNumberGenerator = changeNumberGenerator;
    this.csnGenerator = csnGenerator;
    this.domain = domain;
  }
  /**
   * Remove and return an update form the pending changes list.
   *
   * @param changeNumber The ChangeNumber of the update to remove.
   *
   * @param csn
   *          The CSN of the update to remove.
   * @return The UpdateMsg that was just removed.
   */
  public synchronized LDAPUpdateMsg remove(ChangeNumber changeNumber)
  public synchronized LDAPUpdateMsg remove(CSN csn)
  {
    return pendingChanges.remove(changeNumber).getMsg();
    return pendingChanges.remove(csn).getMsg();
  }
  /**
@@ -108,14 +106,12 @@
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param csn The CSN of the update message that must be set as committed.
   * @param msg          The message associated to the update.
   */
  public synchronized void commit(ChangeNumber changeNumber,
      LDAPUpdateMsg msg)
  public synchronized void commit(CSN csn,      LDAPUpdateMsg msg)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    PendingChange curChange = pendingChanges.get(csn);
    if (curChange == null)
    {
      throw new NoSuchElementException();
@@ -127,12 +123,11 @@
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param csn The CSN of the update message that must be set as committed.
   */
  public synchronized void commit(ChangeNumber changeNumber)
  public synchronized void commit(CSN csn)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    PendingChange curChange = pendingChanges.get(csn);
    if (curChange == null)
    {
      throw new NoSuchElementException();
@@ -146,14 +141,14 @@
   *
   * @param operation The local operation for which an UpdateMsg must
   *                  be added in the pending list.
   * @return The ChangeNumber now associated to the operation.
   * @return The CSN now associated to the operation.
   */
  public synchronized ChangeNumber putLocalOperation(PluginOperation operation)
  public synchronized CSN putLocalOperation(PluginOperation operation)
  {
    ChangeNumber changeNumber = changeNumberGenerator.newChangeNumber();
    PendingChange change = new PendingChange(changeNumber, operation, null);
    pendingChanges.put(changeNumber, change);
    return changeNumber;
    CSN csn = csnGenerator.newCSN();
    PendingChange change = new PendingChange(csn, operation, null);
    pendingChanges.put(csn, change);
    return csn;
  }
  /**
@@ -169,9 +164,9 @@
      return numSentUpdates;
    }
    // peek the oldest changeNumber
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    // peek the oldest CSN
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    while (firstChange != null && firstChange.isCommitted())
    {
@@ -188,10 +183,10 @@
        {
          // do not push updates until the RS catches up.
          // @see #setRecovering(boolean)
          domain.getServerState().update(updateMsg.getChangeNumber());
          domain.getServerState().update(updateMsg.getCSN());
        }
      }
      pendingChanges.remove(firstChangeNumber);
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
      {
@@ -199,9 +194,9 @@
      }
      else
      {
        // peek the oldest changeNumber
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
        // peek the oldest CSN
        firstCSN = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstCSN);
      }
    }
    return numSentUpdates;
@@ -213,52 +208,50 @@
   * in a single atomic operation.
   *
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param csn The CSN of the update message that must be set as committed.
   * @param msg          The message associated to the update.
   *
   * @return The number of pushed updates.
   */
  public synchronized int commitAndPushCommittedChanges(
      ChangeNumber changeNumber, LDAPUpdateMsg msg)
  public synchronized int commitAndPushCommittedChanges(CSN csn,
      LDAPUpdateMsg msg)
  {
    commit(changeNumber, msg);
    commit(csn, msg);
    return pushCommittedChanges();
  }
  /**
   * Set the PendingChangesList structure in a mode where it is
   * waiting for the RS to receive all the previous changes to
   * be sent before starting to process the changes normally.
   * In this mode, The Domain does not publish the changes from
   * the pendingChanges because there are older changes that
   * need to be published before.
   * Set the PendingChangesList structure in a mode where it is waiting for the
   * RS to receive all the previous changes to be sent before starting to
   * process the changes normally. In this mode, The Domain does not publish the
   * changes from the pendingChanges because there are older changes that need
   * to be published before.
   *
   * @param b The recovering status that must be set.
   * @param recovering
   *          The recovering status that must be set.
   */
  public void setRecovering(boolean b)
  public void setRecovering(boolean recovering)
  {
    recoveringOldChanges = b;
    recoveringOldChanges = recovering;
  }
  /**
   * Allows to update the recovery situation by comparing the ChangeNumber of
   * the last change that was sent to the ReplicationServer with the
   * ChangeNumber of the last operation that was taken out of the
   * PendingChanges list.
   * If he two match then the recovery is completed and normal procedure can
   * restart. Otherwise the RSUpdate thread must continue to look for
   * older changes and no changes can be committed from the pendingChanges list.
   * Allows to update the recovery situation by comparing the CSN of the last
   * change that was sent to the ReplicationServer with the CSN of the last
   * operation that was taken out of the PendingChanges list. If the two match
   * then the recovery is completed and normal procedure can restart. Otherwise
   * the RSUpdate thread must continue to look for older changes and no changes
   * can be committed from the pendingChanges list.
   *
   * @param recovered  The ChangeNumber of the last change that was published
   *                   to the ReplicationServer.
   *
   * @return           A boolean indicating if the recovery is completed (false)
   *                   or must continue (true).
   * @param recovered
   *          The CSN of the last change that was published to the
   *          ReplicationServer.
   * @return A boolean indicating if the recovery is completed (false) or must
   *         continue (true).
   */
  public synchronized boolean recoveryUntil(ChangeNumber recovered)
  public synchronized boolean recoveryUntil(CSN recovered)
  {
    ChangeNumber lastLocalChange = domain.getLastLocalChange();
    CSN lastLocalChange = domain.getLastLocalChange();
    if (recovered != null && recovered.newerOrEquals(lastLocalChange))
    {
      recoveringOldChanges = false;