mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
08.03.2008 7adb93986ace907531875e25be1f94d735fbb068
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,14 +26,19 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import org.opends.messages.Message;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.types.operation.PluginOperation;
/**
@@ -61,32 +66,23 @@
  private ChangeNumberGenerator changeNumberGenerator;
  /**
   * The Replicationbroker that will be used to send UpdateMsg.
   * The ReplicationDomain that will be used to send UpdateMsg.
   */
  private ReplicationBroker broker;
  /**
   * The ServerState that will be updated when UpdateMsg are committed.
   */
  private ServerState state;
  private ReplicationDomain domain;
  /**
   * Creates a new PendingChanges using the provided ChangeNumberGenerator.
   *
   * @param changeNumberGenerator The ChangeNumberGenerator to use to create
   *                               new unique ChangeNumbers.
   * @param broker  The Replicationbroker that will be used to send
   * @param domain  The ReplicationDomain that will be used to send
   *                UpdateMsg.
   * @param state   The ServerState that will be updated when UpdateMsg
   *                are committed.
   */
  public PendingChanges(
      ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
      ServerState state)
      ChangeNumberGenerator changeNumberGenerator, ReplicationDomain domain)
  {
    this.changeNumberGenerator = changeNumberGenerator;
    this.broker = broker;
    this.state = state;
    this.domain = domain;
  }
  /**
@@ -96,7 +92,7 @@
   *
   * @return The UpdateMsg that was just removed.
   */
  public synchronized UpdateMsg remove(ChangeNumber changeNumber)
  public synchronized LDAPUpdateMsg remove(ChangeNumber changeNumber)
  {
    return pendingChanges.remove(changeNumber).getMsg();
  }
@@ -119,7 +115,7 @@
   * @param msg          The message associated to the update.
   */
  public synchronized void commit(ChangeNumber changeNumber,
      UpdateMsg msg)
      LDAPUpdateMsg msg)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    if (curChange == null)
@@ -186,9 +182,19 @@
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
        LDAPUpdateMsg updateMsg = firstChange.getMsg();
          try
          {
            domain.publish(updateMsg);
          } catch (TimeoutException ex) {
            // This exception may only be raised if assured replication is
            // enabled
            Message errorMsg = ERR_DS_ACK_TIMEOUT.get(
              domain.getServiceID(), Long.toString(domain.getAssuredTimeout()),
              updateMsg.toString());
            logError(errorMsg);
          }
        }
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())