mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jan-Peter Nilsson
09.31.2020 029beee63ef0fa2eb3915241c3b3c866d0355b6c
FIX Replication not catching up properly if there is many pending
changes https://github.com/OpenIdentityPlatform/OpenDJ/issues/141

Don't move to following mode if lateQueue can be refilled

Catching up when more than 100 changes behind was broken by 7160f59040.
We read chunks of 100 entries to lateQueue, so lateQueue being empty is
not neccessarily indicative
of that we are done, just that this chunk of 100 is done.
We need to check if lateQueue can be refilled before deciding we are
done.

Workaround for ReplicaOfflineMsg in lateQueue

OpenDJ have two queues for replication playback.
- lateQueue which is gradually filled from history
- msgQueue which is filled with new changes while on-line

At startup OpenDJ starts by processing lateQueue, refilling lateQueue
until we run out of changes or we find a change that is also in
msgQueue. At which point processing can switch over to msgQueue.

The problem we run into is when we have DS:es that are offline, then we
have ReplicaOfflineMsg messages in lateQueue.
ReplicaOfflineMsg does not affect domainState and re-fill of lateQueue
is based on domainState, so if the last entry in lateQueue is an
ReplicaOfflineMsg we will never move past it.
If we have an empty msgQueue the current code manages to step out, but
if we do get something in msgQueue it will never be able to do the step
from lateQueue to msgQueue.

After removing a member that DS will forever be part of the replication
data as offline.

To work-around this problem this patch pushes the messages from
lateQueue to msgQueue if:
- We just refilled lateQueue and
- lateQueue is not full, so another refill is not likely to give us
anything else
- lateQueue only contains ReplicaOfflineMsg (does not
contributesToDomainState)

This allows us to move from processing lateQueue to msgQueue.

A limitation with this is that it will only work as long as we
do not get more ReplicaOfflineMsg that we can fit in lateQueue.
The default queue size is 100, so I expect it to work when we have
less than 100 offline DS:es. It will stop working if reach 100
offline DS:es, e.g. due to repeated adding/removing servers.
1 files modified
32 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java 32 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -258,7 +258,7 @@
           *           restart as usual
           *   load this change on the delayList
           */
          fillLateQueue();
          boolean queueContributesToDomainState = fillLateQueue();
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
@@ -283,6 +283,15 @@
            UpdateMsg msg = lateQueue.first();
            synchronized (msgQueue)
            {
              if (!queueContributesToDomainState)
              {
               // If nothing in the queue contributesToDomainState, add it all to msgQueue so we can get out of here
               while(!lateQueue.isEmpty())
               {
                 msgQueue.add(lateQueue.removeFirst());
               }
              }
              if (msgQueue.contains(msg))
              {
                /* we finally catch up with the regular queue */
@@ -307,9 +316,14 @@
            // By default a server is always not following. A weird case where messages not representing
            // an operation may happen, making the late queue repeatedly fill and be emptied without ever
            // getting the server out of state "not following".
            if (lateQueue.isEmpty() && msgQueue.isEmpty())
            {
              following = true;
              CSN nextChange = findOldestCSNFromReplicaDBs();
              if (nextChange == null)
              {
                following = true;
              }
            }
          }
          if (updateServerState(msg))
@@ -363,15 +377,25 @@
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue() throws ChangelogException
  private boolean fillLateQueue() throws ChangelogException
  {
    boolean contributesToDomainState = false;
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        lateQueue.add(cursor.getRecord());
        UpdateMsg msg = cursor.getRecord();
        lateQueue.add(msg);
        contributesToDomainState |= msg.contributesToDomainState();
      }
    }
    // If we stopped because we filled the queue, set contributesToDomainState to true
    if (!contributesToDomainState && !isLateQueueBelowThreshold())
    {
      contributesToDomainState = true;
    }
    return contributesToDomainState;
  }
  private boolean isLateQueueBelowThreshold()