mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
01.23.2014 5d7be546948d1d019e3d29932b222d69412643dd
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -295,31 +295,11 @@
           *           restart as usual
           *   load this change on the delayList
           */
          DBCursor<UpdateMsg> cursor = null;
          try
          {
            // fill the lateQueue
            cursor = replicationServerDomain.getCursorFrom(serverState);
            while (cursor.next() && isLateQueueBelowThreshold())
            {
              lateQueue.add(cursor.getRecord());
            }
          }
          catch (ChangelogException e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
          finally
          {
            close(cursor);
          }
          /*
           * If the late queue is empty then we could not find any messages in
           * the replication log so the remote server is not late anymore.
           */
          fillLateQueue();
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
            // so the remote server is not late anymore.
            synchronized (msgQueue)
            {
              // Ensure we are below threshold so this server will follow the
@@ -333,8 +313,8 @@
          else
          {
            /*
             * if the first change in the lateQueue is also on the regular
             * queue, we can resume the processing from the regular queue
             * if the first change in the lateQueue is also on the regular queue,
             * we can resume the processing from the regular queue
             * -> set following to true and empty the lateQueue.
             */
            UpdateMsg msg = lateQueue.first();
@@ -356,7 +336,7 @@
        {
          // get the next change from the lateQueue
          UpdateMsg msg;
          synchronized (msgQueue)
          synchronized (msgQueue) // TODO JNR why synchronize(msgQueue) here?
          {
            msg = lateQueue.removeFirst();
          }
@@ -409,6 +389,27 @@
    return null;
  }
  private void fillLateQueue()
  {
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      cursor = replicationServerDomain.getCursorFrom(serverState);
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        lateQueue.add(cursor.getRecord());
      }
    }
    catch (ChangelogException e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
    }
    finally
    {
      close(cursor);
    }
  }
  private boolean isLateQueueBelowThreshold()
  {
    return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
@@ -428,16 +429,14 @@
      {
        if (!msgQueue.isEmpty())
        {
          UpdateMsg msg = msgQueue.first();
          result = msg.getCSN();
          result = msgQueue.first().getCSN();
        }
      }
      else
      {
        if (!lateQueue.isEmpty())
        {
          UpdateMsg msg = lateQueue.first();
          result = msg.getCSN();
          result = lateQueue.first().getCSN();
        }
        else
        {