mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -27,28 +27,26 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.messages.Message;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
/**
 * This class is used to build ordered lists of UpdateMsg.
 * The order is defined by the order of the ChangeNumber of the UpdateMsg.
 * The order is defined by the order of the CSN of the UpdateMsg.
 */
public class MsgQueue
{
  private SortedMap<ChangeNumber, UpdateMsg>  map =
    new TreeMap<ChangeNumber, UpdateMsg>();
  private SortedMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  private final Object lock = new Object();
  // The total number of bytes for all the message in the queue.
  /** The total number of bytes for all the message in the queue. */
  private int bytesCount = 0;
  /**
@@ -113,21 +111,21 @@
  {
    synchronized (lock)
    {
      UpdateMsg msgSameChangeNumber = map.put(update.getChangeNumber(), update);
      if (msgSameChangeNumber != null)
      UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
      if (msgSameCSN != null)
      {
        try
        {
          if (msgSameChangeNumber.getBytes().length != update.getBytes().length
              || msgSameChangeNumber.isAssured() != update.isAssured()
              || msgSameChangeNumber.getVersion() != update.getVersion())
          if (msgSameCSN.getBytes().length != update.getBytes().length
              || msgSameCSN.isAssured() != update.isAssured()
              || msgSameCSN.getVersion() != update.getVersion())
          {
            // Adding 2 msgs with the same ChangeNumber is ok only when
            // Adding 2 msgs with the same CSN is ok only when
            // the 2 msgs are the same
            bytesCount += (update.size() - msgSameChangeNumber.size());
            bytesCount += (update.size() - msgSameCSN.size());
            Message errMsg = ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN.get(
                msgSameChangeNumber.getChangeNumber().toString(),
                msgSameChangeNumber.toString(), update.toString());
                msgSameCSN.getCSN().toString(),
                msgSameCSN.toString(), update.toString());
            logError(errMsg);
          }
        }
@@ -152,7 +150,7 @@
    synchronized (lock)
    {
      UpdateMsg update = map.get(map.firstKey());
      map.remove(update.getChangeNumber());
      map.remove(update.getCSN());
      bytesCount -= update.size();
      if ((map.size() == 0) && (bytesCount != 0))
      {
@@ -167,19 +165,19 @@
  /**
   * Returns <tt>true</tt> if this map contains an UpdateMsg
   * with the same ChangeNumber as the given UpdateMsg.
   * with the same CSN as the given UpdateMsg.
   *
   * @param msg UpdateMsg whose presence in this queue is to be tested.
   *
   * @return <tt>true</tt> if this map contains an UpdateMsg
   *         with the same ChangeNumber as the given UpdateMsg.
   *         with the same CSN as the given UpdateMsg.
   *
   */
  public boolean contains(UpdateMsg msg)
  {
    synchronized (lock)
    {
      return map.containsKey(msg.getChangeNumber());
      return map.containsKey(msg.getCSN());
    }
  }