mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.40.2008 66f48672bc7953e77364a9a7ae41f1e70d83534f
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 10000;
  private int maxQueueSize = 5000;
  private int maxQueueBytesSize = maxQueueSize * 100;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
@@ -115,8 +116,8 @@
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controled and should
                                       // be stopped from sending messsages.
                                       // flow controlled and should
                                       // be stopped from sending messages.
  private int saturationCount = 0;
  private short replicationServerId;
@@ -165,6 +166,7 @@
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
    this.protocolVersion = ProtocolVersion.currentVersion();
  }
@@ -305,7 +307,7 @@
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        /* Until here session is encrypted then it depends on the negotiation */
        if (!sslEncryption)
        {
          session.stopEncryption();
@@ -352,7 +354,7 @@
        }
        else
        {
          // We are an empty Replicationserver
          // We are an empty Replication Server
          if ((generationId>0)&&(!serverState.isEmpty()))
          {
            // If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
  {
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      int size = msgQueue.count();
      if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
        return true;
@@ -764,7 +766,7 @@
  {
    synchronized (msgQueue)
    {
      int queueSize = msgQueue.size();
      int queueSize = msgQueue.count();
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.size();
       return msgQueue.count();
     else
     {
       /*
@@ -1034,7 +1036,8 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while (msgQueue.size() > maxQueueSize)
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      {
        setFollowing(false);
        msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change accross all servers.
          // change across all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          while (!iteratorSortedSet.isEmpty() &&
                 (lateQueue.count()<100) &&
                 (lateQueue.bytesCount()<50000) )
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
          {
            synchronized (msgQueue)
            {
              if (msgQueue.size() < maxQueueSize)
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              {
                setFollowing(true);
              }
@@ -1203,7 +1209,7 @@
            {
              if (msgQueue.contains(msg))
              {
                /* we finally catched up with the regular queue */
                /* we finally catch up with the regular queue */
                setFollowing(true);
                lateQueue.clear();
                UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
   * Update the serverState with the last message sent.
   *
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningfull.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean  updateServerState(UpdateMessage msg)
  {
@@ -1599,6 +1605,15 @@
      }
    }
    attributes.add(
        new Attribute("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        new Attribute(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(
        new Attribute(
            "following", String.valueOf(following)));
    // Deprecated
    attributes.add(new Attribute("max-waiting-changes",
                                  String.valueOf(maxQueueSize)));