mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.59.2013 19298042f99ef0a659f9271d022f58884a18704b
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -49,6 +49,9 @@
/**
 * This class implements a buffering/producer/consumer mechanism of
 * replication changes (UpdateMsg) used inside the replication server.
 * It logically represents another RS than the current one, and is used when the
 * other RS is brought online and needs to catch up with the changes in the
 * current RS.
 *
 * MessageHandlers are registered into Replication server domains.
 * When an update message is received by a domain, the domain forwards
@@ -156,21 +159,31 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      while (isMsgQueueAboveThreshold())
      {
        following = false;
        msgQueue.removeFirst();
      }
    }
  }
  private boolean isMsgQueueAboveThreshold()
  {
    return msgQueue.count() > maxQueueSize
        || msgQueue.bytesCount() > maxQueueBytesSize;
  }
  private boolean isMsgQueueBelowThreshold()
  {
    return !isMsgQueueAboveThreshold();
  }
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
@@ -300,16 +313,16 @@
          }
          /*
           * If the late queue is empty then we could not find any
           * messages in the replication log so the remote serevr is not
           * late anymore.
           * If the late queue is empty then we could not find any messages in
           * the replication log so the remote server is not late anymore.
           */
          if (lateQueue.isEmpty())
          {
            synchronized (msgQueue)
            {
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              // Ensure we are below threshold so this server will follow the
              // msgQueue without fearing the msgQueue gets trimmed
              if (isMsgQueueBelowThreshold())
              {
                following = true;
              }
@@ -330,13 +343,9 @@
                /* we finally catch up with the regular queue */
                following = true;
                lateQueue.clear();
                UpdateMsg msg1;
                do
                {
                  msg1 = msgQueue.removeFirst();
                } while (!msg.getCSN().equals(msg1.getCSN()));
                msgQueue.consumeUpTo(msg);
                updateServerState(msg);
                return msg1;
                return msg;
              }
            }
          }
@@ -386,9 +395,9 @@
        }
      }
      /*
       * Need to loop because following flag may have gone to false between
       * the first check at the beginning of this method
       * and the second check just above.
       * Need to loop because following flag may have gone to false between the
       * first check at the beginning of this method and the second check just
       * above.
       */
    }
    return null;
@@ -497,7 +506,7 @@
        new TreeSet<ReplicaDBCursor>();
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server to get a cursor
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCsn = serverState.getCSN(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));