mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
24.44.2013 46fd9423ab622d7f9531aa1564846ec52fe09534
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -180,7 +180,7 @@
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      {
        setFollowing(false);
        following = false;
        msgQueue.removeFirst();
      }
    }
@@ -272,9 +272,9 @@
  protected UpdateMsg getNextMessage(boolean synchronous)
  {
    UpdateMsg msg;
    while (activeConsumer == true)
    while (activeConsumer)
    {
      if (following == false)
      if (!following)
      {
        /* this server is late with regard to some other masters
         * in the topology or just joined the topology.
@@ -376,7 +376,7 @@
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              {
                setFollowing(true);
                following = true;
              }
            }
          } else
@@ -392,7 +392,7 @@
              if (msgQueue.contains(msg))
              {
                /* we finally catch up with the regular queue */
                setFollowing(true);
                following = true;
                lateQueue.clear();
                UpdateMsg msg1;
                do
@@ -417,11 +417,11 @@
      }
      synchronized (msgQueue)
      {
        if (following == true)
        if (following)
        {
          try
          {
            while (msgQueue.isEmpty() && (following == true))
            while (msgQueue.isEmpty() && following)
            {
              if (!synchronous)
                return null;
@@ -465,7 +465,7 @@
    ChangeNumber result = null;
    synchronized (msgQueue)
    {
      if (isFollowing())
      if (following)
      {
        if (msgQueue.isEmpty())
        {
@@ -479,13 +479,14 @@
      {
        if (lateQueue.isEmpty())
        {
          // isFollowing is false AND lateQueue is empty
          // We may be at the very moment when the writer has emptyed the
          // lateQueue when it sent the last update. The writer will fill again
          // the lateQueue when it will send the next update but we are not yet
          // there. So let's take the last change not sent directly from
          // the db.
          /*
          following is false AND lateQueue is empty
          We may be at the very moment when the writer has emptied the
          lateQueue when it sent the last update. The writer will fill again
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from
          the db.
          */
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
@@ -500,9 +501,11 @@
              // get an iterator in this server db from that last change
              ReplicationIterator iterator =
                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
              // if that iterator has changes, then it is a candidate
              // it is added in the sorted list at a position given by its
              // current change (see ReplicationIteratorComparator).
              /*
              if that iterator has changes, then it is a candidate
              it is added in the sorted list at a position given by its
              current change (see ReplicationIteratorComparator).
              */
              if (iterator != null)
              {
                if (iterator.getChange() != null)
@@ -558,7 +561,7 @@
       * When the server is up to date or close to be up to date,
       * the number of updates to be sent is the size of the receive queue.
       */
      if (isFollowing())
      if (following)
        return msgQueue.count();
      else
      {
@@ -622,16 +625,6 @@
  }
  /**
   * Check if the LDAP server can follow the speed of the other servers.
   * @return true when the server has all the not yet sent changes
   *         in its queue.
   */
  public boolean isFollowing()
  {
    return following;
  }
  /**
   * Set that the consumer is now becoming inactive and thus getNextMessage
   * should not return any UpdateMsg any more.
   * @param active the provided state of the consumer.
@@ -641,14 +634,6 @@
    this.activeConsumer = active;
  }
  /**
   * Set the following flag of this server.
   * @param following the value that should be set.
   */
  private void setFollowing(boolean following)
  {
    this.following = following;
  }
  /**
   * Set the initial value of the serverState for this handler.