mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
24.51.2009 cd092a8cf0e5ff9b6b0df1c56e3ddfdd86a4aeaa
Fix failure of replication Dependency unit test by rethinking ReplicationIterator
3 files modified
282 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 67 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java 76 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java 139 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,7 +35,6 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -47,7 +45,6 @@
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
@@ -123,7 +120,6 @@
   *
   */
  private long trimage;
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -206,8 +202,8 @@
  /**
   * Get some changes out of the message queue of the LDAP server.
   *
   * @param number the number of messages to extract.
   * (from the beginning of the queue)
   * @param number the maximum number of messages to extract.
   * @return a List containing number changes extracted from the queue.
   */
  private List<UpdateMsg> getChanges(int number)
@@ -281,50 +277,17 @@
  public ReplicationIterator generateIterator(ChangeNumber changeNumber)
                           throws DatabaseException, Exception
  {
    /*
     * When we create an iterator we need to make sure that we
     * don't miss some changes because the iterator is created
     * close to the limit of the changed that have not yet been
     * flushed to the database.
     * We detect this by comparing the date of the changeNumber where
     * we want to start with the date of the first ChangeNumber
     * of the msgQueue.
     * If this is the case we flush the queue to the database.
     */
    ChangeNumber recentChangeNumber = null;
    if (changeNumber == null)
    {
      flush();
    }
    synchronized (msgQueue)
    {
      try
      {
        UpdateMsg msg = msgQueue.getLast();
        recentChangeNumber = msg.getChangeNumber();
      }
      catch (NoSuchElementException e)
      {}
    }
    if ( (recentChangeNumber != null) && (changeNumber != null))
    {
      if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
         ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
      {
        flush();
      }
    }
    ReplicationIterator it =
      new ReplicationIterator(serverId, db, changeNumber);
      new ReplicationIterator(serverId, db, changeNumber, this);
    return it;
  }
  /**
   * Removes message in a subList of the msgQueue from the msgQueue.
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
   */
@@ -335,7 +298,7 @@
      int current = 0;
      while ((current < number) && (!msgQueue.isEmpty()))
      {
        UpdateMsg msg = msgQueue.remove();
        UpdateMsg msg = msgQueue.remove(); // remove first
        queueByteSize -= msg.size();
        current++;
      }
@@ -508,8 +471,10 @@
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * Flush is done by chunk sized to 500 messages, starting from the
   * beginning of the list.
   */
  private void flush()
  public void flush()
  {
    int size;
    int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
@@ -518,7 +483,8 @@
    {
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        // get N (or less) messages from the queue to save to the DB
        // (from the beginning of the queue)
        List<UpdateMsg> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
@@ -528,7 +494,8 @@
        // save the change to the stable storage.
        db.addEntries(changes);
        // remove the changes from the list of changes to be saved.
        // remove the changes from the list of changes to be saved
        // (remove from the beginning of the queue)
        clearQueue(changes.size());
      }
    } while (size >= chunksize);
@@ -668,4 +635,14 @@
  {
    return this.serverId;
  }
  /**
   * Return the size of the msgQueue (the memory cache of the DbHandler).
   * For test purpose.
   * @return The memory queue size.
   */
  public int getQueueSize()
  {
    return this.msgQueue.size();
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -40,6 +40,9 @@
{
  private UpdateMsg currentChange = null;
  private ReplServerDBCursor cursor = null;
  private DbHandler dbh;
  private ReplicationDB db;
  ChangeNumber lastNonNullCurrentCN;
  /**
   * Creates a new ReplicationIterator.
@@ -49,18 +52,40 @@
   * @param id the Identifier of the server on which the iterator applies.
   * @param db The db where the iterator must be created.
   * @param changeNumber The ChangeNumber after which the iterator must start.
   * @param dbh The associated DbHandler.
   * @throws Exception If there is no other change to push after change
   *         with changeNumber number.
   * @throws DatabaseException if a database problem happened.
   */
  public ReplicationIterator(
          short id, ReplicationDB db, ChangeNumber changeNumber)
          short id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
          throws Exception, DatabaseException
  {
    cursor = db.openReadCursor(changeNumber);
    this.db = db;
    this.dbh = dbh;
    this.lastNonNullCurrentCN = changeNumber;
    try
    {
      cursor = db.openReadCursor(changeNumber);
    }
    catch(Exception e)
    {
      // we didn't find it in the db
      cursor = null;
    }
    if (cursor == null)
    {
      throw new Exception("no new change");
      // flush the queue into the db
      dbh.flush();
      // look again in the db
      cursor = db.openReadCursor(changeNumber);
      if (cursor == null)
      {
        throw new Exception("no new change");
      }
    }
  }
@@ -80,18 +105,47 @@
   */
  public boolean next()
  {
    currentChange = cursor.next();
    boolean hasNext = false;
    currentChange = cursor.next(); // can return null
    if (currentChange != null)
      return true;
    {
      lastNonNullCurrentCN = currentChange.getChangeNumber();
      hasNext = true;
    }
    else
    {
      // TODO : should check here if some changes are still in the
      // dbHandler message queue and not yet saved to the backing database
      // if yes should get change from there from now on.
      return false;
      synchronized (this)
      {
        if (cursor != null)
        {
          cursor.close();
          cursor = null;
        }
        dbh.flush();
        try
        {
          cursor = db.openReadCursor(lastNonNullCurrentCN);
          currentChange = cursor.next(); // can return null
          lastNonNullCurrentCN = currentChange.getChangeNumber();
          if (currentChange != null)
          {
            hasNext = true;
          }
          else
          {
            hasNext = false;
          }
        }
        catch(Exception e)
        {
          currentChange = null;
          hasNext = false;
        }
      }
    }
    return hasNext;
  }
  /**
@@ -108,6 +162,8 @@
        cursor.close();
        cursor = null;
      }
      this.dbh = null;
      this.db = null;
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -50,6 +50,7 @@
    ReplicationServer replicationServer = null;
    ReplicationDbEnv dbEnv = null;
    DbHandler handler = null;
    ReplicationIterator it = null;
    try
    {
      TestCaseUtils.startServer();
@@ -85,22 +86,145 @@
      ChangeNumber changeNumber1 = gen.newChangeNumber();
      ChangeNumber changeNumber2 = gen.newChangeNumber();
      ChangeNumber changeNumber3 = gen.newChangeNumber();
      ChangeNumber changeNumber4 = gen.newChangeNumber();
      ChangeNumber changeNumber5 = gen.newChangeNumber();
      DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
        "uid");
      DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
        "uid");
      DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
        "uid");
      "uid");
      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4,
      "uid");
      handler.add(update1);
      handler.add(update2);
      handler.add(update3);
      // The ChangeNumber should not get purged
      //--
      // Iterator tests with memory queue only populated
      // verify that memory queue is populated
      assertEquals(handler.getQueueSize(),3);
      // Iterator from existing CN
      it = handler.generateIterator(changeNumber1);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
          " Actual change number=" + it.getChange().getChangeNumber() +
          " Expect change number=" + changeNumber2);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      it.releaseCursor();
      it=null;
      // Iterator from NON existing CN
      Exception ec = null;
      try
      {
        it = handler.generateIterator(changeNumber5);
      }
      catch(Exception e)
      {
        ec = e;
      }
      assertNotNull(ec);
      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
      //--
      // Iterator tests with db only populated
      Thread.sleep(1000); // let the time for flush to happen
      // verify that memory queue is empty (all changes flushed in the db)
      assertEquals(handler.getQueueSize(),0);
      // Test iterator from existing CN
      it = handler.generateIterator(changeNumber1);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      it.releaseCursor();
      it=null;
      // Iterator from NON existing CN
      ec = null;
      try
      {
        it = handler.generateIterator(changeNumber5);
      }
      catch(Exception e)
      {
        ec = e;
      }
      assertNotNull(ec);
      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
      // Test first and last
      assertEquals(changeNumber1, handler.getFirstChange());
      assertEquals(changeNumber3, handler.getLastChange());
      //--
      // Iterator tests with db and memory queue populated
      // all changes in the db - add one in the memory queue
      handler.add(update4);
      // verify memory queue contains this one
      assertEquals(handler.getQueueSize(),1);
      // Test iterator from existing CN
      it = handler.generateIterator(changeNumber1);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      assertTrue(it.getChange()==null);
      it.releaseCursor();
      it=null;
      // Test iterator from existing CN at the limit between queue and db
      it = handler.generateIterator(changeNumber3);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      assertTrue(it.getChange()==null);
      it.releaseCursor();
      it=null;
      // Test iterator from existing CN at the limit between queue and db
      it = handler.generateIterator(changeNumber4);
      assertFalse(it.next());
      assertTrue(it.getChange()==null,
          " Actual change number=" + it.getChange());
      it.releaseCursor();
      it=null;
      // Test iterator from NON existing CN
      ec = null;
      try
      {
        it = handler.generateIterator(changeNumber5);
      }
      catch(Exception e)
      {
        ec = e;
      }
      assertNotNull(ec);
      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
      handler.setPurgeDelay(1);
      boolean purged = false;
@@ -109,8 +233,8 @@
      {
        ChangeNumber firstChange = handler.getFirstChange();
        ChangeNumber lastChange = handler.getLastChange();
        if ((!firstChange.equals(changeNumber3) ||
          (!lastChange.equals(changeNumber3))))
        if ((!firstChange.equals(changeNumber4) ||
          (!lastChange.equals(changeNumber4))))
        {
          TestCaseUtils.sleep(100);
        } else
@@ -120,6 +244,11 @@
      }
    } finally
    {
      if (it != null)
      {
        it.releaseCursor();
        it=null;
      }
      if (handler != null)
        handler.shutdown();
      if (dbEnv != null)