mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
10.32.2013 b4a1565a2ab3cd0192a1b17c026f16e151fd04ca
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -54,21 +54,31 @@
{
  /**
   * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
   * replication domain, advancing from the oldest to the newest change cross
   * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
   * a replication domain, advancing from the oldest to the newest change cross
   * all replicaDBs.
   */
  private final class CrossReplicaDBCursor implements ReplicaDBCursor
  private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
  {
    private final DN baseDN;
    private UpdateMsg currentChange;
    /**
     * The cursors are sorted based on the current change of each cursor to
     * consider the next change across all replicaDBs.
     */
    private final NavigableSet<ReplicaDBCursor> cursors =
        new TreeSet<ReplicaDBCursor>();
    private final DN baseDN;
    private final NavigableSet<DBCursor<UpdateMsg>> cursors =
        new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
        {
          @Override
          public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
          {
            final CSN csn1 = o1.getRecord().getCSN();
            final CSN csn2 = o2.getRecord().getCSN();
            return CSN.compare(csn1, csn2);
          }
        });
    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
    {
@@ -81,7 +91,7 @@
      }
    }
    private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
    private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
        CSN startAfterCSN)
    {
      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -89,7 +99,8 @@
      {
        try
        {
          ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
          DBCursor<UpdateMsg> cursor =
              replicaDB.generateCursorFrom(startAfterCSN);
          cursor.next();
          return cursor;
        }
@@ -112,16 +123,16 @@
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove and eventually add again a cursor (after moving it forward).
      final ReplicaDBCursor cursor = cursors.pollFirst();
      currentChange = cursor.getChange();
      final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
      currentChange = cursor.getRecord();
      cursor.next();
      addCursorIfNotEmpty(cursor);
      return true;
    }
    void addCursorIfNotEmpty(ReplicaDBCursor cursor)
    void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
    {
      if (cursor.getChange() != null)
      if (cursor.getRecord() != null)
      {
        cursors.add(cursor);
      }
@@ -132,7 +143,7 @@
    }
    @Override
    public UpdateMsg getChange()
    public UpdateMsg getRecord()
    {
      return currentChange;
    }
@@ -143,15 +154,6 @@
      StaticUtils.close(cursors);
    }
    @Override
    public int compareTo(ReplicaDBCursor o)
    {
      final CSN csn1 = getChange().getCSN();
      final CSN csn2 = o.getChange().getCSN();
      return CSN.compare(csn1, csn2);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
@@ -184,27 +186,18 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private static final ReplicaDBCursor EMPTY_CURSOR = new ReplicaDBCursor()
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
      new DBCursor<UpdateMsg>()
  {
    @Override
    public int compareTo(ReplicaDBCursor o)
    {
      if (o == null)
      {
        throw new NullPointerException(); // as per javadoc
      }
      return o == this ? 0 : -1; // equal to self, but less than all the rest
    }
    @Override
    public boolean next()
    {
      return false;
    }
    @Override
    public UpdateMsg getChange()
    public UpdateMsg getRecord()
    {
      return null;
    }
@@ -218,7 +211,7 @@
    @Override
    public String toString()
    {
      return "EmptyReplicaDBCursor";
      return "EmptyDBCursor<UpdateMsg>";
    }
  };
@@ -670,7 +663,7 @@
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN)
  {
    // Builds a new serverState for all the serverIds in the replication domain
    // to ensure we get cursors starting after the provided CSN.
@@ -679,7 +672,7 @@
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN,
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState)
  {
    return new CrossReplicaDBCursor(baseDN, startAfterServerState);