mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
17.46.2006 46e6061d63562ce021ef8f3b5062d3eba1c2db4e
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -85,6 +85,7 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 10000;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
@@ -109,13 +110,16 @@
  /**
   * Creates a new server handler instance with the provided socket.
   *
   * @param  session The ProtocolSession used by the ServerHandler to
   * @param session The ProtocolSession used by the ServerHandler to
   *                 communicate with the remote entity.
   * @param queueSize The maximum number of update that will be kept
   *                  in memory by this ServerHandler.
   */
  public ServerHandler(ProtocolSession session)
  public ServerHandler(ProtocolSession session, int queueSize)
  {
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
  }
  /**
@@ -467,19 +471,50 @@
  {
   synchronized (msgQueue)
   {
     /*
      * TODO : When the server  is not able to follow, the msgQueue
      * may become too large and therefore won't contain all the
      * changes. Some changes may only be stored in the backing DB
      * of the servers.
      * The calculation should be done by asking to the each dbHandler
      * how many changes need to be replicated and making the sum
      * For now just return maxint in this case
      */
    /*
     * When the server is up to date or close to be up to date,
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.size();
     else
       return Integer.MAX_VALUE;
     {
       /*
        * When the server  is not able to follow, the msgQueue
        * may become too large and therefore won't contain all the
        * changes. Some changes may only be stored in the backing DB
        * of the servers.
        * The total size of teh receieve queue is calculated by doing
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = changelogCache.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
         if (currentChange != null)
         {
           int current = currentChange.getSeqnum();
           if (current == max)
           {
           }
           else if (current < max)
           {
             totalCount += max - current;
           }
           else
           {
             totalCount += Integer.MAX_VALUE - (current - max) + 1;
           }
         }
         else
         {
           totalCount += max;
         }
       }
       return totalCount;
     }
   }
  }
@@ -576,7 +611,7 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while (msgQueue.size() > 10000)
      while (msgQueue.size() > maxQueueSize)
      {
        following = false;
        msgQueue.removeFirst();
@@ -687,7 +722,7 @@
          {
            synchronized (msgQueue)
            {
              if (msgQueue.size() < 10000)
              if (msgQueue.size() < maxQueueSize)
              {
                following = true;
              }
@@ -1026,6 +1061,8 @@
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("max-waiting-changes",
                                 String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-waiting-acks",
                                 String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("update-sent",