mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
05.04.2009 9dc10dec2d5d7f61116f7f647b7cf9596ca77be0
opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -22,13 +22,17 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.messages.Message;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -41,6 +45,7 @@
{
  private SortedMap<ChangeNumber, UpdateMsg>  map =
    new TreeMap<ChangeNumber, UpdateMsg>();
  private final Object lock = new Object();
  // The total number of bytes for all the message in the queue.
  private int bytesCount = 0;
@@ -52,7 +57,10 @@
   */
  public UpdateMsg first()
  {
    return map.get(map.firstKey());
    synchronized (lock)
    {
      return map.get(map.firstKey());
    }
  }
  /**
@@ -62,7 +70,10 @@
   */
  public UpdateMsg last()
  {
    return map.get(map.lastKey());
    synchronized (lock)
    {
      return map.get(map.lastKey());
    }
  }
  /**
@@ -72,7 +83,10 @@
   */
  public int count()
  {
    return map.size();
    synchronized (lock)
    {
      return map.size();
    }
  }
  /**
@@ -82,7 +96,10 @@
   */
  public int bytesCount()
  {
    return bytesCount;
    synchronized (lock)
    {
      return bytesCount;
    }
  }
  /**
@@ -92,7 +109,10 @@
   */
  public boolean isEmpty()
  {
    return map.isEmpty();
    synchronized (lock)
    {
      return map.isEmpty();
    }
  }
@@ -103,8 +123,11 @@
   */
  public void add(UpdateMsg update)
  {
    map.put(update.getChangeNumber(), update);
    bytesCount += update.size();
    synchronized (lock)
    {
      map.put(update.getChangeNumber(), update);
      bytesCount += update.size();
    }
  }
  /**
@@ -114,10 +137,19 @@
   */
  public UpdateMsg removeFirst()
  {
    UpdateMsg msg = map.get(map.firstKey());
    map.remove(msg.getChangeNumber());
    bytesCount -= msg.size();
    return msg;
    synchronized (lock)
    {
      UpdateMsg update = map.get(map.firstKey());
      map.remove(update.getChangeNumber());
      bytesCount -= update.size();
      if ((map.size() == 0) && (bytesCount != 0))
      {
        Message msg = ERR_BYTE_COUNT.get();
        logError(msg);
        bytesCount = 0;
      }
      return update;
    }
  }
  /**
@@ -132,7 +164,10 @@
   */
  public boolean contains(UpdateMsg msg)
  {
    return map.containsKey(msg.getChangeNumber());
    synchronized (lock)
    {
      return map.containsKey(msg.getChangeNumber());
    }
  }
  /**
@@ -140,7 +175,10 @@
   */
  public void clear()
  {
    map.clear();
    bytesCount = 0;
    synchronized (lock)
    {
      map.clear();
      bytesCount = 0;
    }
  }
}