mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
10.46.2015 c8da51116d2c6e8adcc3d4b78d573e84aac57a4e
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -28,7 +28,6 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.types.Attributes.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.List;
@@ -68,13 +67,11 @@
class MessageHandler extends MonitorProvider<MonitorProviderCfg>
{
  /** The logger of this class. */
  protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final int MINIMUM_TRESHOLD_MSG_QUEUE_SIZE = 5;
  /**
   * UpdateMsg queue.
   */
  /** UpdateMsg queue. */
  private final MsgQueue msgQueue = new MsgQueue();
  /**
   * Late queue. All access to the lateQueue in getNextMessage() is
@@ -82,41 +79,23 @@
   * need protecting against removals performed using getNextMessage().
   */
  private final MsgQueue lateQueue = new MsgQueue();
  /**
   * Local hosting RS.
   */
  protected ReplicationServer replicationServer;
  /**
   * Specifies the related replication server domain based on baseDN.
   */
  /** Local hosting RS. */
  protected final ReplicationServer replicationServer;
  /** Specifies the related replication server domain based on baseDN. */
  protected ReplicationServerDomain replicationServerDomain;
  /**
   * Number of update sent to the server.
   */
  /** Number of update sent to the server. */
  private int outCount;
  /**
   * Number of updates received from the server.
   */
  /** Number of updates received from the server. */
  private int inCount;
  /**
   * Specifies the max queue size for this handler.
   */
  protected int maxQueueSize = 5000;
  /**
   * Specifies the max queue size in bytes for this handler.
   */
  private int maxQueueBytesSize = maxQueueSize * 100;
  /**
   * Specifies whether the consumer is following the producer (is not late).
   */
  /** Specifies the max queue size for this handler. */
  protected final int maxQueueSize;
  /** Specifies the max queue size in bytes for this handler. */
  private final int maxQueueBytesSize;
  /** Specifies whether the consumer is following the producer (is not late). */
  private boolean following;
  /**
   * Specifies the current serverState of this handler.
   */
  /** Specifies the current serverState of this handler. */
  private ServerState serverState;
  /**
   * Specifies the baseDN of the domain.
   */
  /** Specifies the baseDN of the domain. */
  private DN baseDN;
  /**
   * Specifies whether the consumer is still active or not.
@@ -124,10 +103,8 @@
   * Called at the beginning of shutdown process.
   */
  private boolean activeConsumer = true;
  /**
   * Set when ServerHandler is stopping.
   */
  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
  /** Set when ServerHandler is stopping. */
  private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
  /**
   * Creates a new server handler instance with the provided socket.
@@ -164,9 +141,7 @@
      msgQueue.add(update);
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      // TODO : size should be configurable and larger than max-receive-queue-size
      while (isMsgQueueAboveThreshold())
      {
        following = false;
@@ -238,7 +213,7 @@
  @Override
  public List<Attribute> getMonitorData()
  {
    List<Attribute> attributes = new ArrayList<Attribute>();
    List<Attribute> attributes = new ArrayList<>();
    attributes.add(create("handler", getMonitorInstanceName()));
    attributes.add(create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
@@ -359,7 +334,6 @@
        }
      }
      synchronized (msgQueue)
      {
        if (following)
@@ -409,10 +383,8 @@
   */
  private void fillLateQueue(Set<Integer> connectedReplicaIds)
  {
    DBCursor<UpdateMsg> cursor = null;
    try
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      cursor = replicationServerDomain.getCursorFrom(serverState);
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        final UpdateMsg record = cursor.getRecord();
@@ -426,10 +398,6 @@
    {
      logger.traceException(e);
    }
    finally
    {
      close(cursor);
    }
  }
  private boolean isLateQueueBelowThreshold()
@@ -477,10 +445,8 @@
  private CSN findOldestCSNFromReplicaDBs()
  {
    DBCursor<UpdateMsg> cursor = null;
    try
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      cursor = replicationServerDomain.getCursorFrom(serverState);
      while (cursor.next())
      {
        final UpdateMsg record = cursor.getRecord();
@@ -495,10 +461,6 @@
    {
      return null;
    }
    finally
    {
      close(cursor);
    }
  }
  /**
@@ -559,17 +521,13 @@
    return baseDN;
  }
  /**
   * Increase the counter of updates received from the server.
   */
  /** Increase the counter of updates received from the server. */
  void incrementInCount()
  {
    inCount++;
  }
  /**
   * Increase the counter of updates sent to the server.
   */
  /** Increase the counter of updates sent to the server. */
  void incrementOutCount()
  {
    outCount++;
@@ -593,7 +551,6 @@
    this.activeConsumer = active;
  }
  /**
   * Set the initial value of the serverState for this handler.
   * Expected to be done once, then the state will be updated using
@@ -607,7 +564,6 @@
    this.serverState = serverState;
  }
  /**
   * Set the baseDN for this handler. Expected to be done once and never changed
   * during the handler life.
@@ -638,9 +594,7 @@
    }
  }
  /**
   * Shutdown this handler.
   */
  /** Shutdown this handler. */
  public void shutdown()
  {
    synchronized (msgQueue)
@@ -692,7 +646,6 @@
    return this.replicationServer.getServerURL();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {