mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.07.2013 9603e9270e1350abd3248e6187d4d4bb0a8fee1d
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
  {
    while (activeConsumer)
    {
         UpdateMsg msg;
      if (!following)
      {
        /* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
           *           restart as usual
           *   load this change on the delayList
           */
               NavigableSet<ReplicationDBCursor> sortedCursors = null;
          NavigableSet<ReplicationDBCursor> sortedCursors = null;
          try
          {
            sortedCursors = collectAllCursorsWithChanges();
                  // fill the lateQueue
            // fill the lateQueue
            while (!sortedCursors.isEmpty()
                && lateQueue.count() < 100
                && lateQueue.bytesCount() < 50000)
            {
                     lateQueue.add(nextOldestUpdateMsg(sortedCursors));
              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
            }
          }
          finally
@@ -315,15 +314,15 @@
                following = true;
              }
            }
               }
               else
          }
          else
          {
            /*
             * if the first change in the lateQueue is also on the regular
             * queue, we can resume the processing from the regular queue
             * -> set following to true and empty the lateQueue.
             */
            msg = lateQueue.first();
            UpdateMsg msg = lateQueue.first();
            synchronized (msgQueue)
            {
              if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
              }
            }
          }
            }
            else
        }
        else
        {
               // get the next change from the lateQueue
          // get the next change from the lateQueue
          UpdateMsg msg;
          synchronized (msgQueue)
          {
            msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
          return msg;
        }
      }
      synchronized (msgQueue)
      {
        if (following)
@@ -371,8 +373,7 @@
          {
            return null;
          }
          msg = msgQueue.removeFirst();
          UpdateMsg msg = msgQueue.removeFirst();
          if (updateServerState(msg))
          {
            /*
@@ -393,38 +394,38 @@
    return null;
  }
   private UpdateMsg nextOldestUpdateMsg(
         NavigableSet<ReplicationDBCursor> sortedCursors)
   {
      /*
       * The cursors are sorted based on the currentChange of each cursor to
       * consider the next change across all servers.
       * To keep consistent the order of the cursors in the SortedSet,
       * it is necessary to remove and eventually add again a cursor (after moving
       * it forward).
       */
      final ReplicationDBCursor cursor = sortedCursors.pollFirst();
      final UpdateMsg result = cursor.getChange();
      cursor.next();
      addCursorIfNotEmpty(sortedCursors, cursor);
      return result;
   }
  private UpdateMsg nextOldestUpdateMsg(
      NavigableSet<ReplicationDBCursor> sortedCursors)
  {
    /*
     * The cursors are sorted based on the currentChange of each cursor to
     * consider the next change across all servers.
     * To keep consistent the order of the cursors in the SortedSet,
     * it is necessary to remove and eventually add again a cursor (after moving
     * it forward).
     */
    final ReplicationDBCursor cursor = sortedCursors.pollFirst();
    final UpdateMsg result = cursor.getChange();
    cursor.next();
    addCursorIfNotEmpty(sortedCursors, cursor);
    return result;
  }
   private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
         ReplicationDBCursor cursor)
   {
      if (cursor != null)
      {
         if (cursor.getChange() != null)
         {
            cursors.add(cursor);
         }
         else
         {
            close(cursor);
         }
      }
   }
  private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
      ReplicationDBCursor cursor)
  {
    if (cursor != null)
    {
      if (cursor.getChange() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        close(cursor);
      }
    }
  }
  /**
   * Get the older Change Number for that server.
@@ -460,49 +461,49 @@
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from the db.
          */
               result = findOldestChangeNumberFromReplicationDBs();
          result = findOldestChangeNumberFromReplicationDBs();
        }
      }
    }
    return result;
  }
   private ChangeNumber findOldestChangeNumberFromReplicationDBs()
   {
      SortedSet<ReplicationDBCursor> sortedCursors = null;
      try
      {
        sortedCursors = collectAllCursorsWithChanges();
         UpdateMsg msg = sortedCursors.first().getChange();
         return msg.getChangeNumber();
      }
      catch (Exception e)
      {
         return null;
      }
      finally
      {
        close(sortedCursors);
      }
   }
   /**
    * Collects all the replication DB cursors that have changes and sort them
    * with the oldest {@link ChangeNumber} first.
    *
    * @return a List of cursors with changes sorted by their {@link ChangeNumber}
    *         (oldest first)
    */
   private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  private ChangeNumber findOldestChangeNumberFromReplicationDBs()
  {
      final NavigableSet<ReplicationDBCursor> results =
            new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    SortedSet<ReplicationDBCursor> sortedCursors = null;
    try
    {
      sortedCursors = collectAllCursorsWithChanges();
      UpdateMsg msg = sortedCursors.first().getChange();
      return msg.getChangeNumber();
    }
    catch (Exception e)
    {
      return null;
    }
    finally
    {
      close(sortedCursors);
    }
  }
  /**
   * Collects all the replication DB cursors that have changes and sort them
   * with the oldest {@link ChangeNumber} first.
   *
   * @return a List of cursors with changes sorted by their {@link ChangeNumber}
   *         (oldest first)
   */
  private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  {
    final NavigableSet<ReplicationDBCursor> results =
        new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server to get a cursor
         final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
         addCursorIfNotEmpty(results,
               replicationServerDomain.getCursorFrom(serverId, lastCsn));
      final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));
    }
    return results;
  }