mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
13.06.2013 d24edab25bfe93a3e3ea4eb38fb9860cef7aa034
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@
  protected static final DebugTracer TRACER = getTracer();
  /**
   * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
   * a replication domain, advancing from the oldest to the newest change cross
   * all replicaDBs.
   */
  private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
  {
    private final DN baseDN;
    private UpdateMsg currentChange;
    /**
     * The cursors are sorted based on the current change of each cursor to
     * consider the next change across all replicaDBs.
     */
    private final NavigableSet<DBCursor<UpdateMsg>> cursors =
        new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
        {
          @Override
          public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
          {
            final CSN csn1 = o1.getRecord().getCSN();
            final CSN csn2 = o2.getRecord().getCSN();
            return CSN.compare(csn1, csn2);
          }
        });
    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
        throws ChangelogException
    {
      this.baseDN = baseDN;
      for (int serverId : getDomainMap(baseDN).keySet())
      {
        // get the last already sent CSN from that server to get a cursor
        final CSN lastCSN = startAfterServerState.getCSN(serverId);
        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
      }
    }
    private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
        CSN startAfterCSN) throws ChangelogException
    {
      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
      if (replicaDB != null)
      {
        DBCursor<UpdateMsg> cursor =
            replicaDB.generateCursorFrom(startAfterCSN);
        cursor.next();
        return cursor;
      }
      return EMPTY_CURSOR;
    }
    @Override
    public boolean next() throws ChangelogException
    {
      if (cursors.isEmpty())
      {
        currentChange = null;
        return false;
      }
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove and eventually add again a cursor (after moving it forward).
      final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
      currentChange = cursor.getRecord();
      cursor.next();
      addCursorIfNotEmpty(cursor);
      return true;
    }
    void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
    {
      if (cursor.getRecord() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        StaticUtils.close(cursor);
      }
    }
    @Override
    public UpdateMsg getRecord()
    {
      return currentChange;
    }
    @Override
    public void close()
    {
      StaticUtils.close(cursors);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return getClass().getSimpleName() + " baseDN=" + baseDN
          + " currentChange=" + currentChange + " open cursors=" + cursors;
    }
  }
  /**
   * This map contains the List of updates received from each LDAP server.
   * <p>
   * When removing a domainMap, code:
@@ -790,7 +686,29 @@
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState) throws ChangelogException
  {
    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final List<DBCursor<UpdateMsg>> cursors =
        new ArrayList<DBCursor<UpdateMsg>>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState.getCSN(serverId);
      cursors.add(getCursorFrom(baseDN, serverId, lastCSN));
    }
    return new CompositeDBCursor(cursors);
  }
  private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN) throws ChangelogException
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
      cursor.next();
      return cursor;
    }
    return EMPTY_CURSOR;
  }
  private ServerState buildServerState(DN baseDN, CSN startAfterCSN)