mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.57.2014 667d7253a3873ed64dafbffe39d8a84a298c1fdc
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -60,7 +60,7 @@
 * Message are buffered into a queue.
 * Consumers are expected to come and consume the UpdateMsg from the queue.
 */
public class MessageHandler extends MonitorProvider<MonitorProviderCfg>
class MessageHandler extends MonitorProvider<MonitorProviderCfg>
{
  /**
@@ -88,11 +88,11 @@
  /**
   * Number of update sent to the server.
   */
  protected int outCount = 0;
  private int outCount = 0;
  /**
   * Number of updates received from the server.
   */
  protected int inCount = 0;
  private int inCount = 0;
  /**
   * Specifies the max queue size for this handler.
   */
@@ -100,7 +100,7 @@
  /**
   * Specifies the max queue size in bytes for this handler.
   */
  protected int maxQueueBytesSize = maxQueueSize * 100;
  private int maxQueueBytesSize = maxQueueSize * 100;
  /**
   * Specifies whether the consumer is following the producer (is not late).
   */
@@ -130,7 +130,7 @@
   *                  in memory by this ServerHandler.
   * @param replicationServer The hosting replication server.
   */
  public MessageHandler(int queueSize, ReplicationServer replicationServer)
  MessageHandler(int queueSize, ReplicationServer replicationServer)
  {
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
@@ -144,7 +144,7 @@
   * @param update The update that must be added to the list of updates of
   * this handler.
   */
  public void add(UpdateMsg update)
  void add(UpdateMsg update)
  {
    synchronized (msgQueue)
    {
@@ -153,7 +153,9 @@
       * waiting for some changes, wake it up
       */
      if (msgQueue.isEmpty())
      {
        msgQueue.notify();
      }
      msgQueue.add(update);
@@ -183,7 +185,7 @@
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  boolean engageShutdown()
  {
    return shuttingDown.getAndSet(true);
  }
@@ -192,7 +194,7 @@
   * Returns the shutdown flag.
   * @return The shutdown flag value.
   */
  public boolean shuttingDown()
  boolean shuttingDown()
  {
    return shuttingDown.get();
  }
@@ -202,9 +204,8 @@
   *
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain(boolean waitConnections)
  private void setDomain(boolean waitConnections)
  {
    if (replicationServerDomain == null)
    {
@@ -214,14 +215,13 @@
        replicationServer.waitConnections();
      }
    }
    return replicationServerDomain;
  }
  /**
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  public int getInCount()
  int getInCount()
  {
    return inCount;
  }
@@ -375,10 +375,14 @@
            while (msgQueue.isEmpty() && following)
            {
              if (!synchronous)
              {
                return null;
              }
              msgQueue.wait(500);
              if (!activeConsumer)
              {
                return null;
              }
            }
          } catch (InterruptedException e)
          {
@@ -478,7 +482,7 @@
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  public int getOutCount()
  int getOutCount()
  {
    return outCount;
  }
@@ -545,7 +549,7 @@
  /**
   * Increase the counter of updates received from the server.
   */
  public void incrementInCount()
  void incrementInCount()
  {
    inCount++;
  }
@@ -553,14 +557,12 @@
  /**
   * Increase the counter of updates sent to the server.
   */
  public void incrementOutCount()
  void incrementOutCount()
  {
    outCount++;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  throws ConfigException, InitializationException
@@ -619,8 +621,8 @@
    else
    {
      this.baseDN = baseDN;
      if (!baseDN.toNormalizedString().equals("cn=changelog"))
        this.replicationServerDomain = getDomain(isDataServer);
      setDomain(!"cn=changelog".equals(baseDN.toNormalizedString())
            && isDataServer);
    }
  }
@@ -645,7 +647,7 @@
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean updateServerState(UpdateMsg msg)
  boolean updateServerState(UpdateMsg msg)
  {
    return serverState.update(msg.getCSN());
  }