mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.52.2013 4936231f6b43a59233dc4ee909d9a2eeb3ced31a
OPENDJ-1116 Introduce abstraction for the changelog DB


Leveraged the ReplicaDBCursor interface to hide iterations cross replica DBs from client code.
Net benefit: more coherent code + reduced coupling between changelogDB and ECL code.
There is an added benefit: searching on cn=changelog now returns changes in order.

The change was implemented by moving code from MessageHandler to JEChangelogDB.


MessageHandler.java:
Removed nextOldestUpdateMsg(), addCursorIfNotEmpty() and collectAllCursorsWithChanges().
Extracted method isLateQueueBelowThreshold().

ReplicationServerDomain.java
Removed getServerIds() and getCursorFrom(int, CSN)
Added getCursorFrom(CSN) and getCursorFrom(ServerState).

ReplicationDomainDB.java:
Removed getDomainServerIds() and getCursorFrom(DN, int, CSN)
Added getCursorFrom(DN, CSN) and getCursorFrom(DN, ServerState).

JEChangelogDB.java
Added inenr class CrossReplicaDBCursor + moved getCursorFrom(DN, int, CSN) here from enclosing type.
Removed getDomainServerIds().
Added getCursorFrom(DN, CSN) and getCursorFrom(DN, ServerState).

ReplicationBackend.java:
In writeChangesAfterCSN(), removed loop on the serverIds + renamed "rsd" to "rsDomain".

JEReplicaDB.java, JEReplicaDBCursor.java:
Implemented toString().
7 files modified
407 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/MessageHandler.java 76 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 59 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 45 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 47 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 169 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@
 */
package org.opends.server.replication.server;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
@@ -294,22 +295,19 @@
           *           restart as usual
           *   load this change on the delayList
           */
          NavigableSet<ReplicaDBCursor> sortedCursors = null;
          ReplicaDBCursor cursor = null;
          try
          {
            sortedCursors = collectAllCursorsWithChanges();
            // fill the lateQueue
            while (!sortedCursors.isEmpty()
                && lateQueue.count() < 100
                && lateQueue.bytesCount() < 50000)
            cursor = replicationServerDomain.getCursorFrom(serverState);
            while (cursor.next() && isLateQueueBelowThreshold())
            {
              lateQueue.add(nextOldestUpdateMsg(sortedCursors));
              lateQueue.add(cursor.getChange());
            }
          }
          finally
          {
            close(sortedCursors);
            close(cursor);
          }
          /*
@@ -403,34 +401,9 @@
    return null;
  }
  private UpdateMsg nextOldestUpdateMsg(
      NavigableSet<ReplicaDBCursor> sortedCursors)
  private boolean isLateQueueBelowThreshold()
  {
    /*
     * The cursors are sorted based on the currentChange of each cursor to
     * consider the next change across all servers.
     * To keep consistent the order of the cursors in the SortedSet,
     * it is necessary to remove and eventually add again a cursor (after moving
     * it forward).
     */
    final ReplicaDBCursor cursor = sortedCursors.pollFirst();
    final UpdateMsg result = cursor.getChange();
    cursor.next();
    addCursorIfNotEmpty(sortedCursors, cursor);
    return result;
  }
  private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
      ReplicaDBCursor cursor)
  {
    if (cursor.getChange() != null)
    {
      cursors.add(cursor);
    }
    else
    {
      close(cursor);
    }
    return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
  }
  /**
@@ -476,12 +449,12 @@
  private CSN findOldestCSNFromReplicaDBs()
  {
    SortedSet<ReplicaDBCursor> sortedCursors = null;
    ReplicaDBCursor cursor = null;
    try
    {
      sortedCursors = collectAllCursorsWithChanges();
      UpdateMsg msg = sortedCursors.first().getChange();
      return msg.getCSN();
      cursor = replicationServerDomain.getCursorFrom(serverState);
      cursor.next();
      return cursor.getChange().getCSN();
    }
    catch (Exception e)
    {
@@ -489,32 +462,11 @@
    }
    finally
    {
      close(sortedCursors);
      close(cursor);
    }
  }
  /**
   * Collects all the {@link ReplicaDBCursor}s that have changes and sort them
   * with the oldest {@link CSN} first.
   *
   * @return a List of cursors with changes sorted by their {@link CSN}
   *         (oldest first)
   */
  private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
  {
    final NavigableSet<ReplicaDBCursor> results =
        new TreeSet<ReplicaDBCursor>();
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCsn = serverState.getCSN(serverId);
      addCursorIfNotEmpty(results,
          replicationServerDomain.getCursorFrom(serverId, lastCsn));
    }
    return results;
  }
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -79,8 +79,7 @@
 * <p>
 * Currently are only implemented the create and restore backup features.
 */
public class ReplicationBackend
       extends Backend
public class ReplicationBackend extends Backend
{
  private static final String CHANGE_NUMBER = "replicationChangeNumber";
@@ -620,43 +619,41 @@
   * Exports or returns all the changes from a ReplicationServerDomain coming
   * after the CSN specified in the searchOperation.
   */
  private void writeChangesAfterCSN(ReplicationServerDomain rsd,
  private void writeChangesAfterCSN(ReplicationServerDomain rsDomain,
      final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation, final CSN previousCSN)
  {
    for (int serverId : rsd.getServerIds())
    if (exportConfig != null && exportConfig.isCancelled())
    { // Abort if cancelled
      return;
    }
    ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN);
    try
    {
      if (exportConfig != null && exportConfig.isCancelled())
      { // Abort if cancelled
        return;
      }
      int lookthroughCount = 0;
      ReplicaDBCursor cursor = rsd.getCursorFrom(serverId, previousCSN);
      try
      // Walk through the changes
      cursor.next(); // first try to advance the cursor
      while (cursor.getChange() != null)
      {
        int lookthroughCount = 0;
        // Walk through the changes
        while (cursor.getChange() != null)
        {
          if (exportConfig != null && exportConfig.isCancelled())
          { // abort if cancelled
            return;
          }
          if (!canContinue(searchOperation, lookthroughCount))
          {
            break;
          }
          lookthroughCount++;
          writeChange(cursor.getChange(), ldifWriter, searchOperation,
              rsd.getBaseDN(), exportConfig != null);
          cursor.next();
        if (exportConfig != null && exportConfig.isCancelled())
        { // abort if cancelled
          return;
        }
        if (!canContinue(searchOperation, lookthroughCount))
        {
          break;
        }
        lookthroughCount++;
        writeChange(cursor.getChange(), ldifWriter, searchOperation,
            rsDomain.getBaseDN(), exportConfig != null);
        cursor.next();
      }
      finally
      {
        close(cursor);
      }
    }
    finally
    {
      close(cursor);
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1269,19 +1269,7 @@
  }
  /**
   * Returns a set containing the serverIds that produced updates and known by
   * this replicationServer from all over the topology, whether directly
   * connected or connected to another RS.
   *
   * @return a set containing the serverIds known by this replicationServer.
   */
  public Set<Integer> getServerIds()
  {
    return domainDB.getDomainServerIds(baseDN);
  }
  /**
   * Creates and returns a cursor.
   * Creates and returns a cursor across this replication domain.
   * <p>
   * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
   * to the next available record.
@@ -1290,16 +1278,35 @@
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   *
   * @param serverId
   *          Identifier of the server for which the cursor is created
   * @param startAfterCSN
   *          Starting point for the cursor. If null, start from the oldest CSN
   * @return a non null {@link ReplicaDBCursor}
   * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN)
   * @see ReplicationDomainDB#getCursorFrom(DN, CSN)
   */
  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
  public ReplicaDBCursor getCursorFrom(CSN startAfterCSN)
  {
    return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
    return domainDB.getCursorFrom(baseDN, startAfterCSN);
  }
  /**
   * Creates and returns a cursor across this replication domain.
   * <p>
   * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
   * to the next available record.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   *
   * @param startAfterServerState
   *          Starting point for the replicaDB cursors. If null, start from the
   *          oldest CSN
   * @return a non null {@link ReplicaDBCursor} going from oldest to newest CSN
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
   */
  public ReplicaDBCursor getCursorFrom(ServerState startAfterServerState)
  {
    return domainDB.getCursorFrom(baseDN, startAfterServerState);
  }
 /**
@@ -2720,7 +2727,7 @@
   */
  public void storeReceivedCTHeartbeat(CSN csn)
  {
    // TODO:May be we can spare processing by only storing CSN (timestamp)
    // TODO:Maybe we can spare processing by only storing CSN (timestamp)
    // instead of a server state.
    getChangeTimeHeartbeatState().update(csn);
  }
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,6 @@
 */
package org.opends.server.replication.server.changelog.api;
import java.util.Set;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -41,16 +39,6 @@
{
  /**
   * Returns the serverIds for the servers that are or have been part of the
   * provided replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return an unmodifiable set of integers holding the serverIds
   */
  Set<Integer> getDomainServerIds(DN baseDN);
  /**
   * Get the number of changes for the specified replication domain.
   *
   * @param baseDN
@@ -171,8 +159,9 @@
  long getCount(DN baseDN, int serverId, CSN from, CSN to);
  /**
   * Generates a {@link ReplicaDBCursor} for the specified serverId and
   * replication domain starting after the provided CSN.
   * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
   * specified replication domain, with all cursors starting after the provided
   * CSN.
   * <p>
   * The cursor is already advanced to the record after startAfterCSN.
   * <p>
@@ -182,13 +171,35 @@
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          Identifier of the server for which the cursor is created
   * @param startAfterCSN
   *          Starting point for the cursor. If null, start from the oldest CSN
   *          Starting point for each ReplicaDB cursor. If null, start from the
   *          oldest CSN for each ReplicaDB cursor.
   * @return a non null {@link ReplicaDBCursor}
   * @see #getCursorFrom(DN, ServerState)
   */
  ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
  ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN);
  /**
   * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
   * specified replication domain starting after the provided
   * {@link ServerState} for each replicaDBs.
   * <p>
   * The cursor is already advanced to the records after the serverState.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param startAfterServerState
   *          Starting point for each ReplicaDB cursor. If any CSN for a
   *          replicaDB is null, then start from the oldest CSN for this
   *          replicaDB
   * @return a non null {@link ReplicaDBCursor}
   * @see #getCursorFrom(DN, CSN)
   */
  ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState);
  /**
   * for the specified serverId and replication domain.
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.messages.Message;
@@ -57,6 +54,114 @@
{
  /**
   * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
   * replication domain, advancing from the oldest to the newest change cross
   * all replicaDBs.
   */
  private final class CrossReplicaDBCursor implements ReplicaDBCursor
  {
    private UpdateMsg currentChange;
    /**
     * The cursors are sorted based on the current change of each cursor to
     * consider the next change across all replicaDBs.
     */
    private final NavigableSet<ReplicaDBCursor> cursors =
        new TreeSet<ReplicaDBCursor>();
    private final DN baseDN;
    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
    {
      this.baseDN = baseDN;
      for (int serverId : getDomainMap(baseDN).keySet())
      {
        // get the last already sent CSN from that server to get a cursor
        final CSN lastCSN = startAfterServerState.getCSN(serverId);
        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
      }
    }
    private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
        CSN startAfterCSN)
    {
      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
      if (replicaDB != null)
      {
        try
        {
          ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
          cursor.next();
          return cursor;
        }
        catch (ChangelogException e)
        {
          // ignored
        }
      }
      return EMPTY_CURSOR;
    }
    @Override
    public boolean next()
    {
      if (cursors.isEmpty())
      {
        currentChange = null;
        return false;
      }
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove and eventually add again a cursor (after moving it forward).
      final ReplicaDBCursor cursor = cursors.pollFirst();
      currentChange = cursor.getChange();
      cursor.next();
      addCursorIfNotEmpty(cursor);
      return true;
    }
    void addCursorIfNotEmpty(ReplicaDBCursor cursor)
    {
      if (cursor.getChange() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        StaticUtils.close(cursor);
      }
    }
    @Override
    public UpdateMsg getChange()
    {
      return currentChange;
    }
    @Override
    public void close()
    {
      StaticUtils.close(cursors);
    }
    @Override
    public int compareTo(ReplicaDBCursor o)
    {
      final CSN csn1 = getChange().getCSN();
      final CSN csn2 = o.getChange().getCSN();
      return CSN.compare(csn1, csn2);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return getClass().getSimpleName() + " baseDN=" + baseDN
          + " currentChange=" + currentChange + " open cursors=" + cursors;
    }
  }
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
@@ -109,6 +214,12 @@
    {
      // empty
    }
    @Override
    public String toString()
    {
      return "EmptyReplicaDBCursor";
    }
  };
  /**
@@ -368,13 +479,6 @@
  /** {@inheritDoc} */
  @Override
  public Set<Integer> getDomainServerIds(DN baseDN)
  {
    return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
  }
  /** {@inheritDoc} */
  @Override
  public long getCount(DN baseDN, int serverId, CSN from, CSN to)
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -565,24 +669,45 @@
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN)
  public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    // Builds a new serverState for all the serverIds in the replication domain
    // to ensure we get cursors starting after the provided CSN.
    return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
  }
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN,
      ServerState startAfterServerState)
  {
    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
  }
  private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
  {
    final ServerState result = new ServerState();
    if (startAfterCSN == null)
    {
      try
      return result;
    }
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      if (serverId == startAfterCSN.getServerId())
      {
        ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
        cursor.next();
        return cursor;
        // reuse the provided CSN one as it is the most accurate
        result.update(startAfterCSN);
      }
      catch (ChangelogException e)
      else
      {
        // ignored
        // build a new CSN, ignoring the seqNum since it is irrelevant for
        // a different serverId
        final CSN csn = startAfterCSN; // only used for increased readability
        result.update(new CSN(csn.getTime(), 0, serverId));
      }
    }
    return EMPTY_CURSOR;
    return result;
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -584,7 +584,8 @@
  @Override
  public String toString()
  {
    return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN;
    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
        + oldestCSN + " " + newestCSN;
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -166,4 +166,12 @@
    return CSN.compare(csn1, csn2);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentChange=" + currentChange + ""
        + replicaDB;
  }
}