mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.52.2013 d454b5f9a2b7dc4ef2a70cd983a26436568cbe04
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@
 */
package org.opends.server.replication.server;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
@@ -294,22 +295,19 @@
           *           restart as usual
           *   load this change on the delayList
           */
          NavigableSet<ReplicaDBCursor> sortedCursors = null;
          ReplicaDBCursor cursor = null;
          try
          {
            sortedCursors = collectAllCursorsWithChanges();
            // fill the lateQueue
            while (!sortedCursors.isEmpty()
                && lateQueue.count() < 100
                && lateQueue.bytesCount() < 50000)
            cursor = replicationServerDomain.getCursorFrom(serverState);
            while (cursor.next() && isLateQueueBelowThreshold())
            {
              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
              lateQueue.add(cursor.getChange());
            }
          }
          finally
          {
            close(sortedCursors);
            close(cursor);
          }
          /*
@@ -403,34 +401,9 @@
    return null;
  }
  private UpdateMsg nextOldestUpdateMsg(
      NavigableSet<ReplicaDBCursor> sortedCursors)
  private boolean isLateQueueBelowThreshold()
  {
    /*
     * The cursors are sorted based on the currentChange of each cursor to
     * consider the next change across all servers.
     * To keep consistent the order of the cursors in the SortedSet,
     * it is necessary to remove and eventually add again a cursor (after moving
     * it forward).
     */
    final ReplicaDBCursor cursor = sortedCursors.pollFirst();
    final UpdateMsg result = cursor.getChange();
    cursor.next();
    addCursorIfNotEmpty(sortedCursors, cursor);
    return result;
  }
  private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
      ReplicaDBCursor cursor)
  {
    if (cursor.getChange() != null)
    {
      cursors.add(cursor);
    }
    else
    {
      close(cursor);
    }
    return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
  }
  /**
@@ -476,12 +449,12 @@
  private CSN findOldestCSNFromReplicaDBs()
  {
    SortedSet<ReplicaDBCursor> sortedCursors = null;
    ReplicaDBCursor cursor = null;
    try
    {
      sortedCursors = collectAllCursorsWithChanges();
      UpdateMsg msg = sortedCursors.first().getChange();
      return msg.getCSN();
      cursor = replicationServerDomain.getCursorFrom(serverState);
      cursor.next();
      return cursor.getChange().getCSN();
    }
    catch (Exception e)
    {
@@ -489,32 +462,11 @@
    }
    finally
    {
      close(sortedCursors);
      close(cursor);
    }
  }
  /**
   * Collects all the {@link ReplicaDBCursor}s that have changes and sort them
   * with the oldest {@link CSN} first.
   *
   * @return a List of cursors with changes sorted by their {@link CSN}
   *         (oldest first)
   */
  private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
  {
    final NavigableSet<ReplicaDBCursor> results =
        new TreeSet<ReplicaDBCursor>();
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCsn = serverState.getCSN(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));
    }
    return results;
  }
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */