mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.59.2013 c210356ee7f6b0037e2d672a4a11e83bbd160b0e
MessageHandler.java:
Extracted methods isMsgQueueAboveThreshold() and isMsgQueueBelowThreshold().
Extracted MsgQueue.consumeUpTo(UpdateMsg).
Updated javadocs.

MsgQueue.java:
Added method consumeUpTo(UpdateMsg).
2 files modified
69 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java 45 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java 24 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -49,6 +49,9 @@
/**
 * This class implements a buffering/producer/consumer mechanism of
 * replication changes (UpdateMsg) used inside the replication server.
 * It logically represents another RS than the current one, and is used when the
 * other RS is brought online and needs to catch up with the changes in the
 * current RS.
 *
 * MessageHandlers are registered into Replication server domains.
 * When an update message is received by a domain, the domain forwards
@@ -156,21 +159,31 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      while (isMsgQueueAboveThreshold())
      {
        following = false;
        msgQueue.removeFirst();
      }
    }
  }
  private boolean isMsgQueueAboveThreshold()
  {
    return msgQueue.count() > maxQueueSize
        || msgQueue.bytesCount() > maxQueueBytesSize;
  }
  private boolean isMsgQueueBelowThreshold()
  {
    return !isMsgQueueAboveThreshold();
  }
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
@@ -300,16 +313,16 @@
          }
          /*
           * If the late queue is empty then we could not find any
           * messages in the replication log so the remote serevr is not
           * late anymore.
           * If the late queue is empty then we could not find any messages in
           * the replication log so the remote server is not late anymore.
           */
          if (lateQueue.isEmpty())
          {
            synchronized (msgQueue)
            {
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              // Ensure we are below threshold so this server will follow the
              // msgQueue without fearing the msgQueue gets trimmed
              if (isMsgQueueBelowThreshold())
              {
                following = true;
              }
@@ -330,13 +343,9 @@
                /* we finally catch up with the regular queue */
                following = true;
                lateQueue.clear();
                UpdateMsg msg1;
                do
                {
                  msg1 = msgQueue.removeFirst();
                } while (!msg.getCSN().equals(msg1.getCSN()));
                msgQueue.consumeUpTo(msg);
                updateServerState(msg);
                return msg1;
                return msg;
              }
            }
          }
@@ -386,9 +395,9 @@
        }
      }
      /*
       * Need to loop because following flag may have gone to false between
       * the first check at the beginning of this method
       * and the second check just above.
       * Need to loop because following flag may have gone to false between the
       * first check at the beginning of this method and the second check just
       * above.
       */
    }
    return null;
@@ -497,7 +506,7 @@
        new TreeSet<ReplicaDBCursor>();
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server to get a cursor
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCsn = serverState.getCSN(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));
opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -27,7 +27,7 @@
 */
package org.opends.server.replication.server;
import java.util.SortedMap;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.opends.messages.Message;
@@ -43,7 +43,7 @@
 */
public class MsgQueue
{
  private SortedMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  private NavigableMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
  private final Object lock = new Object();
  /** The total number of bytes for all the message in the queue. */
@@ -192,4 +192,24 @@
      bytesCount = 0;
    }
  }
  /**
   * Consumes all the messages in this queue up to and including the passed in
   * message. If the passed in message is not contained in the current queue,
   * then all messages will be removed from it.
   *
   * @param msg
   *          the final message to reach when consuming messages from this queue
   */
  public void consumeUpTo(UpdateMsg msg)
  {
    UpdateMsg msg1;
    do
    {
      // FIXME this code could be more efficient if the msgQueue could call the
      // following code (to be tested):
      // map.headMap(msg.getCSN(), true).clear()
      msg1 = removeFirst();
    } while (!msg.getCSN().equals(msg1.getCSN()));
  }
}