mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jan-Peter Nilsson
09.31.2020 029beee63ef0fa2eb3915241c3b3c866d0355b6c
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -258,7 +258,7 @@
           *           restart as usual
           *   load this change on the delayList
           */
          fillLateQueue();
          boolean queueContributesToDomainState = fillLateQueue();
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
@@ -283,6 +283,15 @@
            UpdateMsg msg = lateQueue.first();
            synchronized (msgQueue)
            {
              if (!queueContributesToDomainState)
              {
               // If nothing in the queue contributesToDomainState, add it all to msgQueue so we can get out of here
               while(!lateQueue.isEmpty())
               {
                 msgQueue.add(lateQueue.removeFirst());
               }
              }
              if (msgQueue.contains(msg))
              {
                /* we finally catch up with the regular queue */
@@ -307,9 +316,14 @@
            // By default a server is always not following. A weird case where messages not representing
            // an operation may happen, making the late queue repeatedly fill and be emptied without ever
            // getting the server out of state "not following".
            if (lateQueue.isEmpty() && msgQueue.isEmpty())
            {
              following = true;
              CSN nextChange = findOldestCSNFromReplicaDBs();
              if (nextChange == null)
              {
                following = true;
              }
            }
          }
          if (updateServerState(msg))
@@ -363,15 +377,25 @@
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue() throws ChangelogException
  private boolean fillLateQueue() throws ChangelogException
  {
    boolean contributesToDomainState = false;
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        lateQueue.add(cursor.getRecord());
        UpdateMsg msg = cursor.getRecord();
        lateQueue.add(msg);
        contributesToDomainState |= msg.contributesToDomainState();
      }
    }
    // If we stopped because we filled the queue, set contributesToDomainState to true
    if (!contributesToDomainState && !isLateQueueBelowThreshold())
    {
      contributesToDomainState = true;
    }
    return contributesToDomainState;
  }
  private boolean isLateQueueBelowThreshold()