mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.07.2013 a634c8d90fc2581a9486d91df07e874dda33b69e
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
  {
    while (activeConsumer)
    {
         UpdateMsg msg;
      if (!following)
      {
        /* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
           *           restart as usual
           *   load this change on the delayList
           */
               NavigableSet<ReplicationDBCursor> sortedCursors = null;
          NavigableSet<ReplicationDBCursor> sortedCursors = null;
          try
          {
            sortedCursors = collectAllCursorsWithChanges();
                  // fill the lateQueue
            // fill the lateQueue
            while (!sortedCursors.isEmpty()
                && lateQueue.count() < 100
                && lateQueue.bytesCount() < 50000)
            {
                     lateQueue.add(nextOldestUpdateMsg(sortedCursors));
              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
            }
          }
          finally
@@ -315,15 +314,15 @@
                following = true;
              }
            }
               }
               else
          }
          else
          {
            /*
             * if the first change in the lateQueue is also on the regular
             * queue, we can resume the processing from the regular queue
             * -> set following to true and empty the lateQueue.
             */
            msg = lateQueue.first();
            UpdateMsg msg = lateQueue.first();
            synchronized (msgQueue)
            {
              if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
              }
            }
          }
            }
            else
        }
        else
        {
               // get the next change from the lateQueue
          // get the next change from the lateQueue
          UpdateMsg msg;
          synchronized (msgQueue)
          {
            msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
          return msg;
        }
      }
      synchronized (msgQueue)
      {
        if (following)
@@ -371,8 +373,7 @@
          {
            return null;
          }
          msg = msgQueue.removeFirst();
          UpdateMsg msg = msgQueue.removeFirst();
          if (updateServerState(msg))
          {
            /*
@@ -393,38 +394,38 @@
    return null;
  }
   private UpdateMsg nextOldestUpdateMsg(
         NavigableSet<ReplicationDBCursor> sortedCursors)
   {
      /*
       * The cursors are sorted based on the currentChange of each cursor to
       * consider the next change across all servers.
       * To keep consistent the order of the cursors in the SortedSet,
       * it is necessary to remove and eventually add again a cursor (after moving
       * it forward).
       */
      final ReplicationDBCursor cursor = sortedCursors.pollFirst();
      final UpdateMsg result = cursor.getChange();
      cursor.next();
      addCursorIfNotEmpty(sortedCursors, cursor);
      return result;
   }
  private UpdateMsg nextOldestUpdateMsg(
      NavigableSet<ReplicationDBCursor> sortedCursors)
  {
    /*
     * The cursors are sorted based on the currentChange of each cursor to
     * consider the next change across all servers.
     * To keep consistent the order of the cursors in the SortedSet,
     * it is necessary to remove and eventually add again a cursor (after moving
     * it forward).
     */
    final ReplicationDBCursor cursor = sortedCursors.pollFirst();
    final UpdateMsg result = cursor.getChange();
    cursor.next();
    addCursorIfNotEmpty(sortedCursors, cursor);
    return result;
  }
   private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
         ReplicationDBCursor cursor)
   {
      if (cursor != null)
      {
         if (cursor.getChange() != null)
         {
            cursors.add(cursor);
         }
         else
         {
            close(cursor);
         }
      }
   }
  private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
      ReplicationDBCursor cursor)
  {
    if (cursor != null)
    {
      if (cursor.getChange() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        close(cursor);
      }
    }
  }
  /**
   * Get the older Change Number for that server.
@@ -460,49 +461,49 @@
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from the db.
          */
               result = findOldestChangeNumberFromReplicationDBs();
          result = findOldestChangeNumberFromReplicationDBs();
        }
      }
    }
    return result;
  }
   private ChangeNumber findOldestChangeNumberFromReplicationDBs()
   {
      SortedSet<ReplicationDBCursor> sortedCursors = null;
      try
      {
        sortedCursors = collectAllCursorsWithChanges();
         UpdateMsg msg = sortedCursors.first().getChange();
         return msg.getChangeNumber();
      }
      catch (Exception e)
      {
         return null;
      }
      finally
      {
        close(sortedCursors);
      }
   }
   /**
    * Collects all the replication DB cursors that have changes and sort them
    * with the oldest {@link ChangeNumber} first.
    *
    * @return a List of cursors with changes sorted by their {@link ChangeNumber}
    *         (oldest first)
    */
   private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  private ChangeNumber findOldestChangeNumberFromReplicationDBs()
  {
      final NavigableSet<ReplicationDBCursor> results =
            new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    SortedSet<ReplicationDBCursor> sortedCursors = null;
    try
    {
      sortedCursors = collectAllCursorsWithChanges();
      UpdateMsg msg = sortedCursors.first().getChange();
      return msg.getChangeNumber();
    }
    catch (Exception e)
    {
      return null;
    }
    finally
    {
      close(sortedCursors);
    }
  }
  /**
   * Collects all the replication DB cursors that have changes and sort them
   * with the oldest {@link ChangeNumber} first.
   *
   * @return a List of cursors with changes sorted by their {@link ChangeNumber}
   *         (oldest first)
   */
  private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  {
    final NavigableSet<ReplicationDBCursor> results =
        new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server to get a cursor
         final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
         addCursorIfNotEmpty(results,
               replicationServerDomain.getCursorFrom(serverId, lastCsn));
      final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));
    }
    return results;
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -49,7 +49,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.replication.server.changelog.je.ReplicationDB;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -1263,17 +1262,17 @@
  }
  /**
    * Creates and returns a cursor. When the cursor is not used anymore, the
    * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
    * resources and locks used by the cursor.
    *
    * @param serverId
    *          Identifier of the server for which the cursor is created.
    * @param startAfterCN
    *          Starting point for the cursor.
    * @return the created {@link ReplicationDB}. Null when no DB is available or
    *         the DB is empty for the provided serverId .
    */
   * Creates and returns a cursor. When the cursor is not used anymore, the
   * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
   * resources and locks used by the cursor.
   *
   * @param serverId
   *          Identifier of the server for which the cursor is created.
   * @param startAfterCN
   *          Starting point for the cursor.
   * @return the created {@link ReplicationDBCursor}. Null when no DB is
   *         available or the DB is empty for the provided serverId .
   */
  public ReplicationDBCursor getCursorFrom(int serverId,
      ChangeNumber startAfterCN)
  {
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
@@ -38,24 +38,24 @@
{
  /**
    * Get the UpdateMsg where the cursor is currently set.
    *
    * @return The UpdateMsg where the cursor is currently set.
    */
   * Get the UpdateMsg where the cursor is currently set.
   *
   * @return The UpdateMsg where the cursor is currently set.
   */
  UpdateMsg getChange();
  /**
    * Go to the next change in the ReplicationDB or in the server Queue.
    *
    * @return false if the cursor is already on the last change before this call.
    */
   * Go to the next change in the ReplicationDB or in the server Queue.
   *
   * @return false if the cursor is already on the last change before this call.
   */
  boolean next();
  /**
    * Release the resources and locks used by this cursor. This method must be
    * called when the cursor is no longer used. Failure to do it could cause DB
    * deadlock.
    */
   * Release the resources and locks used by this cursor. This method must be
   * called when the cursor is no longer used. Failure to do it could cause DB
   * deadlock.
   */
  @Override
  void close();
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
@@ -30,7 +30,6 @@
import java.util.Comparator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
/**
 * This class defines a {@link Comparator} that allows to know which
@@ -41,14 +40,14 @@
              implements Comparator<ReplicationDBCursor>
{
  /**
    * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
    *
    * @param o1
    *          first cursor.
    * @param o2
    *          second cursor.
    * @return result of the comparison.
    */
   * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
   *
   * @param o1
   *          first cursor.
   * @param o2
   *          second cursor.
   * @return result of the comparison.
   */
  @Override
  public int compare(ReplicationDBCursor o1, ReplicationDBCursor o2)
  {
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,7 +44,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -260,18 +260,18 @@
  }
  /**
    * Generate a new {@link ReplicationDBCursor} that allows to browse the db
    * managed by this dbHandler and starting at the position defined by a given
    * changeNumber.
    *
    * @param startAfterCN
    *          The position where the cursor must start.
    * @return a new {@link ReplicationDBCursor} that allows to browse the db
    *         managed by this dbHandler and starting at the position defined by a
    *         given changeNumber.
    * @throws ChangelogException
    *           if a database problem happened.
    */
   * Generate a new {@link ReplicationDBCursor} that allows to browse the db
   * managed by this dbHandler and starting at the position defined by a given
   * changeNumber.
   *
   * @param startAfterCN
   *          The position where the cursor must start.
   * @return a new {@link ReplicationDBCursor} that allows to browse the db
   *         managed by this dbHandler and starting at the position defined by a
   *         given changeNumber.
   * @throws ChangelogException
   *           if a database problem happened.
   */
  public ReplicationDBCursor generateCursorFrom(ChangeNumber startAfterCN)
      throws ChangelogException
  {
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
@@ -32,7 +32,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
/**
 * Berkeley DB JE implementation of {@link ReplicationDBCursor}.
@@ -46,18 +46,18 @@
  private ChangeNumber lastNonNullCurrentCN;
  /**
    * Creates a new JEReplicationDBCursor. All created cursor must be released by
    * the caller using the {@link #close()} method.
    *
    * @param db
    *          The db where the cursor must be created.
    * @param startAfterCN
    *          The ChangeNumber after which the cursor must start.
    * @param dbHandler
    *          The associated DbHandler.
    * @throws ChangelogException
    *           if a database problem happened.
    */
   * Creates a new JEReplicationDBCursor. All created cursor must be released by
   * the caller using the {@link #close()} method.
   *
   * @param db
   *          The db where the cursor must be created.
   * @param startAfterCN
   *          The ChangeNumber after which the cursor must start.
   * @param dbHandler
   *          The associated DbHandler.
   * @throws ChangelogException
   *           if a database problem happened.
   */
  public JEReplicationDBCursor(ReplicationDB db, ChangeNumber startAfterCN,
      DbHandler dbHandler) throws ChangelogException
  {
@@ -151,10 +151,10 @@
  }
  /**
    * Called by the Gc when the object is garbage collected Release the internal
    * cursor in case the cursor was badly used and {@link #close()} was never
    * called.
    */
   * Called by the Gc when the object is garbage collected Release the internal
   * cursor in case the cursor was badly used and {@link #close()} was never
   * called.
   */
  @Override
  protected void finalize()
  {