mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.17.2014 bdcbee41067e1f439a8ebe7fd0a9dd01799b593a
opendj3-server-dev/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.server;
import java.util.NavigableMap;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.util.TreeMap;
@@ -38,13 +37,21 @@
/**
 * This class is used to build ordered lists of UpdateMsg.
 * The order is defined by the order of the CSN of the UpdateMsg.
 * @ThreadSafe
 */
public class MsgQueue
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private NavigableMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  private TreeMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  /**
   * FIXME JNR to be investigated:
   * I strongly suspect that we could replace this field
   * by using the synchronized keyword on each method.
   * However, MessageHandler is weirdly synchronizing on msgQueue field
   * even though it is touching the lateQueue field (?!?).
   */
  private final Object lock = new Object();
  /** The total number of bytes for all the message in the queue. */
@@ -112,7 +119,7 @@
  {
    synchronized (lock)
    {
      UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
      final UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
      if (msgSameCSN != null)
      {
        try
@@ -123,11 +130,11 @@
          {
            // Adding 2 msgs with the same CSN is ok only when
            // the 2 msgs are the same
            bytesCount += (update.size() - msgSameCSN.size());
            bytesCount += update.size() - msgSameCSN.size();
            logger.error(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN, msgSameCSN.getCSN(), msgSameCSN, update);
          }
        }
        catch(Exception e)
        catch (Exception e)
        {
          logger.traceException(e);
        }
@@ -149,10 +156,12 @@
  {
    synchronized (lock)
    {
      UpdateMsg update = map.get(map.firstKey());
      // FIXME JNR replace next 2 lines with just that one:
      // final UpdateMsg update = map.pollFirstEntry().getValue();
      final UpdateMsg update = map.get(map.firstKey());
      map.remove(update.getCSN());
      bytesCount -= update.size();
      if ((map.size() == 0) && (bytesCount != 0))
      if (map.isEmpty() && bytesCount != 0)
      {
        // should never happen
        logger.error(ERR_BYTE_COUNT, bytesCount);
@@ -197,18 +206,33 @@
   * message. If the passed in message is not contained in the current queue,
   * then all messages will be removed from it.
   *
   * @param msg
   * @param finalMsg
   *          the final message to reach when consuming messages from this queue
   */
  public void consumeUpTo(UpdateMsg msg)
  public void consumeUpTo(UpdateMsg finalMsg)
  {
    UpdateMsg msg1;
    // FIXME this code could be more efficient if the msgQueue could call the
    // following code (to be tested):
    // if (!map.containsKey(finalMsg.getCSN())) {
    // map.clear();
    // } else {
    // map.headMap(finalMsg.getCSN(), true).clear();
    // }
    final CSN finalCSN = finalMsg.getCSN();
    UpdateMsg msg;
    do
    {
      // FIXME this code could be more efficient if the msgQueue could call the
      // following code (to be tested):
      // map.headMap(msg.getCSN(), true).clear()
      msg1 = removeFirst();
    } while (!msg.getCSN().equals(msg1.getCSN()));
      msg = removeFirst();
    }
    while (!finalCSN.equals(msg.getCSN()));
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue="
        + map.values();
  }
}