mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -58,6 +53,11 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
/**
 * This class should be used as a base for Replication implementations.
 * <p>
@@ -141,8 +141,8 @@
   * to be able to correlate all the coming back acks to the original
   * operation.
   */
  private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs =
    new ConcurrentHashMap<ChangeNumber, UpdateMsg>();
  private final Map<CSN, UpdateMsg> waitingAckMsgs =
    new ConcurrentHashMap<CSN, UpdateMsg>();
  /**
@@ -291,10 +291,10 @@
  private final ServerState state;
  /**
   * The generator that will be used to generate {@link ChangeNumber}
   * The generator that will be used to generate {@link CSN}
   * for this domain.
   */
  private final ChangeNumberGenerator generator;
  private final CSNGenerator generator;
  private final Object eclIncludesLock = new Object();
  private final Map<Integer, Set<String>> eclIncludesByServer =
@@ -313,13 +313,13 @@
  private final Object sessionLock = new Object();
  /**
   * Returns the {@link ChangeNumberGenerator} that will be used to
   * generate {@link ChangeNumber} for this domain.
   * Returns the {@link CSNGenerator} that will be used to
   * generate {@link CSN} for this domain.
   *
   * @return The {@link ChangeNumberGenerator} that will be used to
   *         generate {@link ChangeNumber} for this domain.
   * @return The {@link CSNGenerator} that will be used to
   *         generate {@link CSN} for this domain.
   */
  public ChangeNumberGenerator getGenerator()
  public CSNGenerator getGenerator()
  {
    return generator;
  }
@@ -341,7 +341,7 @@
    this.serverID = serverID;
    this.initWindow = initWindow;
    this.state = new ServerState();
    this.generator = new ChangeNumberGenerator(serverID, state);
    this.generator = new CSNGenerator(serverID, state);
    domains.put(baseDN, this);
  }
@@ -364,7 +364,7 @@
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.state = serverState;
    this.generator = new ChangeNumberGenerator(serverID, state);
    this.generator = new CSNGenerator(serverID, state);
    domains.put(baseDN, this);
  }
@@ -830,7 +830,7 @@
        else if (msg instanceof UpdateMsg)
        {
          update = (UpdateMsg) msg;
          generator.adjust(update.getChangeNumber());
          generator.adjust(update.getCSN());
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
@@ -915,12 +915,11 @@
   */
  private void receiveAck(AckMsg ack)
  {
    UpdateMsg update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    CSN csn = ack.getCSN();
    // Remove the message for pending ack list (this may already make the thread
    // that is waiting for the ack be aware of its reception)
    update = waitingAckMsgs.remove(changeNumber);
    UpdateMsg update = waitingAckMsgs.remove(csn);
    // Signal waiting thread ack has been received
    if (update != null)
@@ -2705,7 +2704,7 @@
    is used the ack is sent without error at replay flag.
    */
    processUpdateDone(msg, null);
    state.update(msg.getChangeNumber());
    state.update(msg.getCSN());
  }
  /**
@@ -3212,7 +3211,7 @@
          if (rsGroupId == groupId)
          {
            // Send the ack
            AckMsg ackMsg = new AckMsg(msg.getChangeNumber());
            AckMsg ackMsg = new AckMsg(msg.getCSN());
            if (replayErrorMsg != null)
            {
              // Mark the error in the ack
@@ -3287,7 +3286,7 @@
        msg.setSafeDataLevel(assuredSdLevel);
      // Add the assured message to the list of update that are waiting for acks
      waitingAckMsgs.put(msg.getChangeNumber(), msg);
      waitingAckMsgs.put(msg.getCSN(), msg);
    }
  }
@@ -3332,8 +3331,8 @@
    long startTime = System.currentTimeMillis();
    synchronized (msg)
    {
      ChangeNumber cn = msg.getChangeNumber();
      while (waitingAckMsgs.containsKey(cn))
      CSN csn = msg.getCSN();
      while (waitingAckMsgs.containsKey(csn))
      {
        try
        {
@@ -3359,8 +3358,7 @@
          remove the update from the wait list, log the timeout error and
          also update assured monitoring counters
          */
          UpdateMsg update;
          update = waitingAckMsgs.remove(cn);
          UpdateMsg update = waitingAckMsgs.remove(csn);
          if (update != null)
          {
@@ -3386,9 +3384,9 @@
              // Should not happen
            }
            throw new TimeoutException("No ack received for message cn: " + cn +
              " and replication servceID: " + baseDN + " after " +
              assuredTimeout + " ms.");
            throw new TimeoutException("No ack received for message csn: "
                + csn + " and replication servceID: " + baseDN + " after "
                + assuredTimeout + " ms.");
          } else
          {
            // Ack received just before timeout limit: we can exit
@@ -3413,7 +3411,7 @@
  {
    // Publish the update
    broker.publish(msg);
    state.update(msg.getChangeNumber());
    state.update(msg.getCSN());
    numSentUpdates.incrementAndGet();
  }
@@ -3428,7 +3426,7 @@
    UpdateMsg update;
    synchronized (this)
    {
      update = new UpdateMsg(generator.newChangeNumber(), msg);
      update = new UpdateMsg(generator.newCSN(), msg);
      /*
      If assured replication is configured, this will prepare blocking
      mechanism. If assured replication is disabled, this returns
@@ -3672,17 +3670,15 @@
    }
  }
  /**
   * Returns the ChangeNUmber of the last Change that was fully processed
   * by this ReplicationDomain.
   * Returns the CSN of the last Change that was fully processed by this
   * ReplicationDomain.
   *
   * @return The ChangeNUmber of the last Change that was fully processed
   *         by this ReplicationDomain.
   * @return The CSN of the last Change that was fully processed by this
   *         ReplicationDomain.
   */
  public ChangeNumber getLastLocalChange()
  public CSN getLastLocalChange()
  {
    return state.getChangeNumber(serverID);
    return state.getCSN(serverID);
  }
}