mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
24.51.2009 5cbf2ca5558c4e978745f7d8b9197ba978faf1cb
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -40,6 +40,9 @@
{
  private UpdateMsg currentChange = null;
  private ReplServerDBCursor cursor = null;
  private DbHandler dbh;
  private ReplicationDB db;
  ChangeNumber lastNonNullCurrentCN;
  /**
   * Creates a new ReplicationIterator.
@@ -49,18 +52,40 @@
   * @param id the Identifier of the server on which the iterator applies.
   * @param db The db where the iterator must be created.
   * @param changeNumber The ChangeNumber after which the iterator must start.
   * @param dbh The associated DbHandler.
   * @throws Exception If there is no other change to push after change
   *         with changeNumber number.
   * @throws DatabaseException if a database problem happened.
   */
  public ReplicationIterator(
          short id, ReplicationDB db, ChangeNumber changeNumber)
          short id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
          throws Exception, DatabaseException
  {
    cursor = db.openReadCursor(changeNumber);
    this.db = db;
    this.dbh = dbh;
    this.lastNonNullCurrentCN = changeNumber;
    try
    {
      cursor = db.openReadCursor(changeNumber);
    }
    catch(Exception e)
    {
      // we didn't find it in the db
      cursor = null;
    }
    if (cursor == null)
    {
      throw new Exception("no new change");
      // flush the queue into the db
      dbh.flush();
      // look again in the db
      cursor = db.openReadCursor(changeNumber);
      if (cursor == null)
      {
        throw new Exception("no new change");
      }
    }
  }
@@ -80,18 +105,47 @@
   */
  public boolean next()
  {
    currentChange = cursor.next();
    boolean hasNext = false;
    currentChange = cursor.next(); // can return null
    if (currentChange != null)
      return true;
    {
      lastNonNullCurrentCN = currentChange.getChangeNumber();
      hasNext = true;
    }
    else
    {
      // TODO : should check here if some changes are still in the
      // dbHandler message queue and not yet saved to the backing database
      // if yes should get change from there from now on.
      return false;
      synchronized (this)
      {
        if (cursor != null)
        {
          cursor.close();
          cursor = null;
        }
        dbh.flush();
        try
        {
          cursor = db.openReadCursor(lastNonNullCurrentCN);
          currentChange = cursor.next(); // can return null
          lastNonNullCurrentCN = currentChange.getChangeNumber();
          if (currentChange != null)
          {
            hasNext = true;
          }
          else
          {
            hasNext = false;
          }
        }
        catch(Exception e)
        {
          currentChange = null;
          hasNext = false;
        }
      }
    }
    return hasNext;
  }
  /**
@@ -108,6 +162,8 @@
        cursor.close();
        cursor = null;
      }
      this.dbh = null;
      this.db = null;
    }
  }