mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.56.2013 bd3c137fd2e1fa9e13289ab0573e07f9a4212e05
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,10 +27,7 @@
 */
package org.opends.server.replication.server;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
@@ -253,9 +250,9 @@
   */
  protected UpdateMsg getNextMessage(boolean synchronous)
  {
    UpdateMsg msg;
    while (activeConsumer)
    {
         UpdateMsg msg;
      if (!following)
      {
        /* this server is late with regard to some other masters
@@ -285,32 +282,22 @@
           *           restart as usual
           *   load this change on the delayList
           */
          SortedSet<ReplicationIterator> iteratorSortedSet = null;
               NavigableSet<ReplicationDBCursor> sortedCursors = null;
          try
          {
            iteratorSortedSet = collectAllIteratorsWithChanges();
            sortedCursors = collectAllCursorsWithChanges();
            /* fill the lateQueue */
            // The loop below relies on the fact that it is sorted based
            // on the currentChange of each iterator to consider the next
            // change across all servers.
            //
            // Hence it is necessary to remove and eventual add again an
            // iterator when looping in order to keep consistent the order of
            // the iterators (see ReplicationIteratorComparator.
            while (!iteratorSortedSet.isEmpty()
                && (lateQueue.count() < 100)
                && (lateQueue.bytesCount() < 50000))
                  // fill the lateQueue
            while (!sortedCursors.isEmpty()
                && lateQueue.count() < 100
                && lateQueue.bytesCount() < 50000)
            {
              ReplicationIterator iterator = iteratorSortedSet.first();
              iteratorSortedSet.remove(iterator);
              lateQueue.add(iterator.getChange());
              addIteratorIfNotEmpty(iteratorSortedSet, iterator);
                     lateQueue.add(nextOldestUpdateMsg(sortedCursors));
            }
          }
          finally
          {
            close(iteratorSortedSet);
            close(sortedCursors);
          }
          /*
@@ -328,7 +315,8 @@
                following = true;
              }
            }
          } else
               }
               else
          {
            /*
             * if the first change in the lateQueue is also on the regular
@@ -353,9 +341,10 @@
              }
            }
          }
        } else
            }
            else
        {
          /* get the next change from the lateQueue */
               // get the next change from the lateQueue
          synchronized (msgQueue)
          {
            msg = lateQueue.removeFirst();
@@ -404,18 +393,38 @@
    return null;
  }
  private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
      ReplicationIterator iter)
  {
    if (iter.next())
    {
      iterators.add(iter);
    }
    else
    {
      close(iter);
    }
  }
   private UpdateMsg nextOldestUpdateMsg(
         NavigableSet<ReplicationDBCursor> sortedCursors)
   {
      /*
       * The cursors are sorted based on the currentChange of each cursor to
       * consider the next change across all servers.
       * To keep consistent the order of the cursors in the SortedSet,
       * it is necessary to remove and eventually add again a cursor (after moving
       * it forward).
       */
      final ReplicationDBCursor cursor = sortedCursors.pollFirst();
      final UpdateMsg result = cursor.getChange();
      cursor.next();
      addCursorIfNotEmpty(sortedCursors, cursor);
      return result;
   }
   private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
         ReplicationDBCursor cursor)
   {
      if (cursor != null)
      {
         if (cursor.getChange() != null)
         {
            cursors.add(cursor);
         }
         else
         {
            close(cursor);
         }
      }
   }
  /**
   * Get the older Change Number for that server.
@@ -449,57 +458,51 @@
          We may be at the very moment when the writer has emptied the
          lateQueue when it sent the last update. The writer will fill again
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from
          the db.
          there. So let's take the last change not sent directly from the db.
          */
          SortedSet<ReplicationIterator> iteratorSortedSet = null;
          try
          {
            iteratorSortedSet = collectAllIteratorsWithChanges();
            UpdateMsg msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          } catch (Exception e)
          {
            result = null;
          } finally
          {
            close(iteratorSortedSet);
          }
               result = findOldestChangeNumberFromReplicationDBs();
        }
      }
    }
    return result;
  }
  private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
  {
    SortedSet<ReplicationIterator> results =
        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
   private ChangeNumber findOldestChangeNumberFromReplicationDBs()
   {
      SortedSet<ReplicationDBCursor> sortedCursors = null;
      try
      {
        sortedCursors = collectAllCursorsWithChanges();
         UpdateMsg msg = sortedCursors.first().getChange();
         return msg.getChangeNumber();
      }
      catch (Exception e)
      {
         return null;
      }
      finally
      {
        close(sortedCursors);
      }
   }
    // Build a list of candidates iterator (i.e. db i.e. server)
   /**
    * Collects all the replication DB cursors that have changes and sort them
    * with the oldest {@link ChangeNumber} first.
    *
    * @return a List of cursors with changes sorted by their {@link ChangeNumber}
    *         (oldest first)
    */
   private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  {
      final NavigableSet<ReplicationDBCursor> results =
            new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server
      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
      // get an iterator in this server db from that last change
      ReplicationIterator iter =
        replicationServerDomain.getChangelogIterator(serverId, lastCsn);
      /*
      if that iterator has changes, then it is a candidate
      it is added in the sorted list at a position given by its
      current change (see ReplicationIteratorComparator).
      */
      if (iter != null)
      {
        if (iter.getChange() != null)
        {
          results.add(iter);
        }
        else
        {
          close(iter);
        }
      }
      // get the last already sent CN from that server to get a cursor
         final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
         addCursorIfNotEmpty(results,
               replicationServerDomain.getCursorFrom(serverId, lastCsn));
    }
    return results;
  }