mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.12.2014 2cf4412179a4ca8610d7fbb2108040377290bf82
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,8 +26,8 @@
 */
package org.opends.server.replication.plugin;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.server.replication.common.CSN;
@@ -51,19 +51,19 @@
  /**
   * A map used to store the pending changes.
   */
  private SortedMap<CSN, PendingChange> pendingChanges =
    new TreeMap<CSN, PendingChange>();
  private final TreeMap<CSN, PendingChange> pendingChanges =
      new TreeMap<CSN, PendingChange>();
  /**
   * The {@link CSNGenerator} to use to create new unique CSNs
   * for each operation done on the replication domain.
   */
  private CSNGenerator csnGenerator;
  private final CSNGenerator csnGenerator;
  /**
   * The ReplicationDomain that will be used to send UpdateMsg.
   */
  private ReplicationDomain domain;
  private final ReplicationDomain domain;
  private boolean recoveringOldChanges = false;
@@ -128,34 +128,32 @@
  synchronized CSN putLocalOperation(PluginOperation operation)
  {
    final CSN csn = csnGenerator.newCSN();
    final PendingChange change = new PendingChange(csn, operation, null);
    pendingChanges.put(csn, change);
    if (!operation.isSynchronizationOperation())
    {
      pendingChanges.put(csn, new PendingChange(csn, operation, null));
    }
    return csn;
  }
  /**
   * Push all committed local changes to the replicationServer service.
   *
   * @return The number of pushed updates.
   */
  synchronized int pushCommittedChanges()
  synchronized void pushCommittedChanges()
  {
    int numSentUpdates = 0;
    if (pendingChanges.isEmpty())
    // peek the oldest change
    Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry();
    if (firstEntry == null)
    {
      return numSentUpdates;
      return;
    }
    // peek the oldest CSN
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    PendingChange firstChange = firstEntry.getValue();
    while (firstChange != null && firstChange.isCommitted())
    {
      final PluginOperation op = firstChange.getOp();
      if (op != null && !op.isSynchronizationOperation())
      {
        numSentUpdates++;
        final LDAPUpdateMsg updateMsg = firstChange.getMsg();
        if (!recoveringOldChanges)
        {
@@ -168,20 +166,14 @@
          domain.getServerState().update(updateMsg.getCSN());
        }
      }
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        // peek the oldest CSN
        firstCSN = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstCSN);
      }
      // false warning: firstEntry will not be null if firstChange is not null
      pendingChanges.remove(firstEntry.getKey());
      // peek the oldest change
      firstEntry = pendingChanges.firstEntry();
      firstChange = firstEntry != null ? firstEntry.getValue() : null;
    }
    return numSentUpdates;
  }
  /**
@@ -189,16 +181,13 @@
   * push all committed local changes to the replicationServer service
   * in a single atomic operation.
   *
   *
   * @param csn The CSN of the update message that must be set as committed.
   * @param msg          The message associated to the update.
   *
   * @return The number of pushed updates.
   * @param msg The message associated to the update.
   */
  synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
  synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
  {
    commit(csn, msg);
    return pushCommittedChanges();
    pushCommittedChanges();
  }
  /**