mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.07.2013 9603e9270e1350abd3248e6187d4d4bb0a8fee1d
OPENDJ-1116 Introduce abstraction for the changelog DB

Fixed checkstyle.
Oh Eclipse, why did you fail me?
6 files modified
277 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/MessageHandler.java 153 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java 24 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java 26 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java 34 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
  {
    while (activeConsumer)
    {
            UpdateMsg msg;
      if (!following)
      {
        /* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
           *           restart as usual
           *   load this change on the delayList
           */
                    NavigableSet<ReplicationDBCursor> sortedCursors = null;
          NavigableSet<ReplicationDBCursor> sortedCursors = null;
          try
          {
            sortedCursors = collectAllCursorsWithChanges();
                        // fill the lateQueue
            // fill the lateQueue
            while (!sortedCursors.isEmpty()
                && lateQueue.count() < 100
                && lateQueue.bytesCount() < 50000)
            {
                            lateQueue.add(nextOldestUpdateMsg(sortedCursors));
              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
            }
          }
          finally
@@ -315,15 +314,15 @@
                following = true;
              }
            }
                    }
                    else
          }
          else
          {
            /*
             * if the first change in the lateQueue is also on the regular
             * queue, we can resume the processing from the regular queue
             * -> set following to true and empty the lateQueue.
             */
            msg = lateQueue.first();
            UpdateMsg msg = lateQueue.first();
            synchronized (msgQueue)
            {
              if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
              }
            }
          }
                }
                else
        }
        else
        {
                    // get the next change from the lateQueue
          // get the next change from the lateQueue
          UpdateMsg msg;
          synchronized (msgQueue)
          {
            msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
          return msg;
        }
      }
      synchronized (msgQueue)
      {
        if (following)
@@ -371,8 +373,7 @@
          {
            return null;
          }
          msg = msgQueue.removeFirst();
          UpdateMsg msg = msgQueue.removeFirst();
          if (updateServerState(msg))
          {
            /*
@@ -393,38 +394,38 @@
    return null;
  }
    private UpdateMsg nextOldestUpdateMsg(
            NavigableSet<ReplicationDBCursor> sortedCursors)
    {
        /*
         * The cursors are sorted based on the currentChange of each cursor to
         * consider the next change across all servers.
         * To keep consistent the order of the cursors in the SortedSet,
         * it is necessary to remove and eventually add again a cursor (after moving
         * it forward).
         */
        final ReplicationDBCursor cursor = sortedCursors.pollFirst();
        final UpdateMsg result = cursor.getChange();
        cursor.next();
        addCursorIfNotEmpty(sortedCursors, cursor);
        return result;
    }
  private UpdateMsg nextOldestUpdateMsg(
      NavigableSet<ReplicationDBCursor> sortedCursors)
  {
    /*
     * The cursors are sorted based on the currentChange of each cursor to
     * consider the next change across all servers.
     * To keep consistent the order of the cursors in the SortedSet,
     * it is necessary to remove and eventually add again a cursor (after moving
     * it forward).
     */
    final ReplicationDBCursor cursor = sortedCursors.pollFirst();
    final UpdateMsg result = cursor.getChange();
    cursor.next();
    addCursorIfNotEmpty(sortedCursors, cursor);
    return result;
  }
    private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
            ReplicationDBCursor cursor)
    {
        if (cursor != null)
        {
            if (cursor.getChange() != null)
            {
                cursors.add(cursor);
            }
            else
            {
                close(cursor);
            }
        }
    }
  private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
      ReplicationDBCursor cursor)
  {
    if (cursor != null)
    {
      if (cursor.getChange() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        close(cursor);
      }
    }
  }
  /**
   * Get the older Change Number for that server.
@@ -460,49 +461,49 @@
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from the db.
          */
                    result = findOldestChangeNumberFromReplicationDBs();
          result = findOldestChangeNumberFromReplicationDBs();
        }
      }
    }
    return result;
  }
    private ChangeNumber findOldestChangeNumberFromReplicationDBs()
    {
        SortedSet<ReplicationDBCursor> sortedCursors = null;
        try
        {
          sortedCursors = collectAllCursorsWithChanges();
            UpdateMsg msg = sortedCursors.first().getChange();
            return msg.getChangeNumber();
        }
        catch (Exception e)
        {
            return null;
        }
        finally
        {
          close(sortedCursors);
        }
    }
    /**
     * Collects all the replication DB cursors that have changes and sort them
     * with the oldest {@link ChangeNumber} first.
     *
     * @return a List of cursors with changes sorted by their {@link ChangeNumber}
     *         (oldest first)
     */
    private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  private ChangeNumber findOldestChangeNumberFromReplicationDBs()
  {
        final NavigableSet<ReplicationDBCursor> results =
                new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    SortedSet<ReplicationDBCursor> sortedCursors = null;
    try
    {
      sortedCursors = collectAllCursorsWithChanges();
      UpdateMsg msg = sortedCursors.first().getChange();
      return msg.getChangeNumber();
    }
    catch (Exception e)
    {
      return null;
    }
    finally
    {
      close(sortedCursors);
    }
  }
  /**
   * Collects all the replication DB cursors that have changes and sort them
   * with the oldest {@link ChangeNumber} first.
   *
   * @return a List of cursors with changes sorted by their {@link ChangeNumber}
   *         (oldest first)
   */
  private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
  {
    final NavigableSet<ReplicationDBCursor> results =
        new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server to get a cursor
            final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
            addCursorIfNotEmpty(results,
                    replicationServerDomain.getCursorFrom(serverId, lastCsn));
      final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));
    }
    return results;
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -49,7 +49,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.replication.server.changelog.je.ReplicationDB;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -1263,17 +1262,17 @@
  }
  /**
     * Creates and returns a cursor. When the cursor is not used anymore, the
     * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
     * resources and locks used by the cursor.
     *
     * @param serverId
     *          Identifier of the server for which the cursor is created.
     * @param startAfterCN
     *          Starting point for the cursor.
     * @return the created {@link ReplicationDB}. Null when no DB is available or
     *         the DB is empty for the provided serverId .
     */
   * Creates and returns a cursor. When the cursor is not used anymore, the
   * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
   * resources and locks used by the cursor.
   *
   * @param serverId
   *          Identifier of the server for which the cursor is created.
   * @param startAfterCN
   *          Starting point for the cursor.
   * @return the created {@link ReplicationDBCursor}. Null when no DB is
   *         available or the DB is empty for the provided serverId .
   */
  public ReplicationDBCursor getCursorFrom(int serverId,
      ChangeNumber startAfterCN)
  {
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
@@ -38,24 +38,24 @@
{
  /**
     * Get the UpdateMsg where the cursor is currently set.
     *
     * @return The UpdateMsg where the cursor is currently set.
     */
   * Get the UpdateMsg where the cursor is currently set.
   *
   * @return The UpdateMsg where the cursor is currently set.
   */
  UpdateMsg getChange();
  /**
     * Go to the next change in the ReplicationDB or in the server Queue.
     *
     * @return false if the cursor is already on the last change before this call.
     */
   * Go to the next change in the ReplicationDB or in the server Queue.
   *
   * @return false if the cursor is already on the last change before this call.
   */
  boolean next();
  /**
     * Release the resources and locks used by this cursor. This method must be
     * called when the cursor is no longer used. Failure to do it could cause DB
     * deadlock.
     */
   * Release the resources and locks used by this cursor. This method must be
   * called when the cursor is no longer used. Failure to do it could cause DB
   * deadlock.
   */
  @Override
  void close();
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
@@ -30,7 +30,6 @@
import java.util.Comparator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
/**
 * This class defines a {@link Comparator} that allows to know which
@@ -41,14 +40,14 @@
              implements Comparator<ReplicationDBCursor>
{
  /**
     * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
     *
     * @param o1
     *          first cursor.
     * @param o2
     *          second cursor.
     * @return result of the comparison.
     */
   * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
   *
   * @param o1
   *          first cursor.
   * @param o2
   *          second cursor.
   * @return result of the comparison.
   */
  @Override
  public int compare(ReplicationDBCursor o1, ReplicationDBCursor o2)
  {
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,7 +44,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -260,18 +260,18 @@
  }
  /**
     * Generate a new {@link ReplicationDBCursor} that allows to browse the db
     * managed by this dbHandler and starting at the position defined by a given
     * changeNumber.
     *
     * @param startAfterCN
     *          The position where the cursor must start.
     * @return a new {@link ReplicationDBCursor} that allows to browse the db
     *         managed by this dbHandler and starting at the position defined by a
     *         given changeNumber.
     * @throws ChangelogException
     *           if a database problem happened.
     */
   * Generate a new {@link ReplicationDBCursor} that allows to browse the db
   * managed by this dbHandler and starting at the position defined by a given
   * changeNumber.
   *
   * @param startAfterCN
   *          The position where the cursor must start.
   * @return a new {@link ReplicationDBCursor} that allows to browse the db
   *         managed by this dbHandler and starting at the position defined by a
   *         given changeNumber.
   * @throws ChangelogException
   *           if a database problem happened.
   */
  public ReplicationDBCursor generateCursorFrom(ChangeNumber startAfterCN)
      throws ChangelogException
  {
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
@@ -32,7 +32,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
/**
 * Berkeley DB JE implementation of {@link ReplicationDBCursor}.
@@ -46,18 +46,18 @@
  private ChangeNumber lastNonNullCurrentCN;
  /**
     * Creates a new JEReplicationDBCursor. All created cursor must be released by
     * the caller using the {@link #close()} method.
     *
     * @param db
     *          The db where the cursor must be created.
     * @param startAfterCN
     *          The ChangeNumber after which the cursor must start.
     * @param dbHandler
     *          The associated DbHandler.
     * @throws ChangelogException
     *           if a database problem happened.
     */
   * Creates a new JEReplicationDBCursor. All created cursor must be released by
   * the caller using the {@link #close()} method.
   *
   * @param db
   *          The db where the cursor must be created.
   * @param startAfterCN
   *          The ChangeNumber after which the cursor must start.
   * @param dbHandler
   *          The associated DbHandler.
   * @throws ChangelogException
   *           if a database problem happened.
   */
  public JEReplicationDBCursor(ReplicationDB db, ChangeNumber startAfterCN,
      DbHandler dbHandler) throws ChangelogException
  {
@@ -151,10 +151,10 @@
  }
  /**
     * Called by the Gc when the object is garbage collected Release the internal
     * cursor in case the cursor was badly used and {@link #close()} was never
     * called.
     */
   * Called by the Gc when the object is garbage collected Release the internal
   * cursor in case the cursor was badly used and {@link #close()} was never
   * called.
   */
  @Override
  protected void finalize()
  {