mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
01.23.2014 5d7be546948d1d019e3d29932b222d69412643dd
opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2012-2013 ForgeRock AS.
 *      Portions copyright 2012-2014 ForgeRock AS.
 */
package org.opends.server.replication.server;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.opends.messages.Message;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -39,10 +37,18 @@
/**
 * This class is used to build ordered lists of UpdateMsg.
 * The order is defined by the order of the CSN of the UpdateMsg.
 * @ThreadSafe
 */
public class MsgQueue
{
  private NavigableMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  private TreeMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  /**
   * FIXME JNR to be investigated:
   * I strongly suspect that we could replace this field
   * by using the synchronized keyword on each method.
   * However, MessageHandler is weirdly synchronizing on msgQueue field
   * even though it is touching the lateQueue field (?!?).
   */
  private final Object lock = new Object();
  /** The total number of bytes for all the message in the queue. */
@@ -110,7 +116,7 @@
  {
    synchronized (lock)
    {
      UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
      final UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
      if (msgSameCSN != null)
      {
        try
@@ -121,14 +127,13 @@
          {
            // Adding 2 msgs with the same CSN is ok only when
            // the 2 msgs are the same
            bytesCount += (update.size() - msgSameCSN.size());
            Message errMsg = ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN.get(
            bytesCount += update.size() - msgSameCSN.size();
            logError(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN.get(
                msgSameCSN.getCSN().toString(),
                msgSameCSN.toString(), update.toString());
            logError(errMsg);
                msgSameCSN.toString(), update.toString()));
          }
        }
        catch(Exception e)
        catch (Exception e)
        {}
      }
      else
@@ -148,14 +153,15 @@
  {
    synchronized (lock)
    {
      UpdateMsg update = map.get(map.firstKey());
      // FIXME JNR replace next 2 lines with just that one:
      // final UpdateMsg update = map.pollFirstEntry().getValue();
      final UpdateMsg update = map.get(map.firstKey());
      map.remove(update.getCSN());
      bytesCount -= update.size();
      if ((map.size() == 0) && (bytesCount != 0))
      if (map.isEmpty() && bytesCount != 0)
      {
        // should never happen
        Message msg = ERR_BYTE_COUNT.get(Integer.toString(bytesCount));
        logError(msg);
        logError(ERR_BYTE_COUNT.get(Integer.toString(bytesCount)));
        bytesCount = 0;
      }
      return update;
@@ -197,18 +203,33 @@
   * message. If the passed in message is not contained in the current queue,
   * then all messages will be removed from it.
   *
   * @param msg
   * @param finalMsg
   *          the final message to reach when consuming messages from this queue
   */
  public void consumeUpTo(UpdateMsg msg)
  public void consumeUpTo(UpdateMsg finalMsg)
  {
    UpdateMsg msg1;
    // FIXME this code could be more efficient if the msgQueue could call the
    // following code (to be tested):
    // if (!map.containsKey(finalMsg.getCSN())) {
    // map.clear();
    // } else {
    // map.headMap(finalMsg.getCSN(), true).clear();
    // }
    final CSN finalCSN = finalMsg.getCSN();
    UpdateMsg msg;
    do
    {
      // FIXME this code could be more efficient if the msgQueue could call the
      // following code (to be tested):
      // map.headMap(msg.getCSN(), true).clear()
      msg1 = removeFirst();
    } while (!msg.getCSN().equals(msg1.getCSN()));
      msg = removeFirst();
    }
    while (!finalCSN.equals(msg.getCSN()));
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue="
        + map.values();
  }
}