mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.52.2013 d454b5f9a2b7dc4ef2a70cd983a26436568cbe04
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.messages.Message;
@@ -57,6 +54,114 @@
{
  /**
   * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
   * replication domain, advancing from the oldest to the newest change cross
   * all replicaDBs.
   */
  private final class CrossReplicaDBCursor implements ReplicaDBCursor
  {
    private UpdateMsg currentChange;
    /**
     * The cursors are sorted based on the current change of each cursor to
     * consider the next change across all replicaDBs.
     */
    private final NavigableSet<ReplicaDBCursor> cursors =
        new TreeSet<ReplicaDBCursor>();
    private final DN baseDN;
    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
    {
      this.baseDN = baseDN;
      for (int serverId : getDomainMap(baseDN).keySet())
      {
        // get the last already sent CSN from that server to get a cursor
        final CSN lastCSN = startAfterServerState.getCSN(serverId);
        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
      }
    }
    private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
        CSN startAfterCSN)
    {
      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
      if (replicaDB != null)
      {
        try
        {
          ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
          cursor.next();
          return cursor;
        }
        catch (ChangelogException e)
        {
          // ignored
        }
      }
      return EMPTY_CURSOR;
    }
    @Override
    public boolean next()
    {
      if (cursors.isEmpty())
      {
        currentChange = null;
        return false;
      }
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove and eventually add again a cursor (after moving it forward).
      final ReplicaDBCursor cursor = cursors.pollFirst();
      currentChange = cursor.getChange();
      cursor.next();
      addCursorIfNotEmpty(cursor);
      return true;
    }
    void addCursorIfNotEmpty(ReplicaDBCursor cursor)
    {
      if (cursor.getChange() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        StaticUtils.close(cursor);
      }
    }
    @Override
    public UpdateMsg getChange()
    {
      return currentChange;
    }
    @Override
    public void close()
    {
      StaticUtils.close(cursors);
    }
    @Override
    public int compareTo(ReplicaDBCursor o)
    {
      final CSN csn1 = getChange().getCSN();
      final CSN csn2 = o.getChange().getCSN();
      return CSN.compare(csn1, csn2);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return getClass().getSimpleName() + " baseDN=" + baseDN
          + " currentChange=" + currentChange + " open cursors=" + cursors;
    }
  }
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
@@ -109,6 +214,12 @@
    {
      // empty
    }
    @Override
    public String toString()
    {
      return "EmptyReplicaDBCursor";
    }
  };
  /**
@@ -368,13 +479,6 @@
  /** {@inheritDoc} */
  @Override
  public Set<Integer> getDomainServerIds(DN baseDN)
  {
    return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
  }
  /** {@inheritDoc} */
  @Override
  public long getCount(DN baseDN, int serverId, CSN from, CSN to)
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -565,24 +669,45 @@
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN)
  public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    // Builds a new serverState for all the serverIds in the replication domain
    // to ensure we get cursors starting after the provided CSN.
    return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
  }
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN,
      ServerState startAfterServerState)
  {
    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
  }
  private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
  {
    final ServerState result = new ServerState();
    if (startAfterCSN == null)
    {
      try
      return result;
    }
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      if (serverId == startAfterCSN.getServerId())
      {
        ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
        cursor.next();
        return cursor;
        // reuse the provided CSN one as it is the most accurate
        result.update(startAfterCSN);
      }
      catch (ChangelogException e)
      else
      {
        // ignored
        // build a new CSN, ignoring the seqNum since it is irrelevant for
        // a different serverId
        final CSN csn = startAfterCSN; // only used for increased readability
        result.update(new CSN(csn.getTime(), 0, serverId));
      }
    }
    return EMPTY_CURSOR;
    return result;
  }
  /** {@inheritDoc} */