From cd092a8cf0e5ff9b6b0df1c56e3ddfdd86a4aeaa Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 14:51:06 +0000
Subject: [PATCH] Fix failure of replication Dependency unit test by rethinking ReplicationIterator

---
 opends/src/server/org/opends/server/replication/server/ReplicationIterator.java                   |   76 +++++++++++++--
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java |  139 ++++++++++++++++++++++++++-
 opends/src/server/org/opends/server/replication/server/DbHandler.java                             |   67 ++++---------
 3 files changed, 222 insertions(+), 60 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 4ed5700..ca05642 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@
 import org.opends.messages.MessageBuilder;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
@@ -36,7 +35,6 @@
 import java.util.Date;
 import java.util.List;
 import java.util.LinkedList;
-import java.util.NoSuchElementException;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.DirectoryThread;
@@ -47,7 +45,6 @@
 import org.opends.server.types.InitializationException;
 import org.opends.server.util.TimeThread;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
@@ -123,7 +120,6 @@
    *
    */
   private long trimage;
-  private static final DebugTracer TRACER = getTracer();
 
   /**
    * Creates a new dbHandler associated to a given LDAP server.
@@ -206,8 +202,8 @@
 
   /**
    * Get some changes out of the message queue of the LDAP server.
-   *
-   * @param number the number of messages to extract.
+   * (from the beginning of the queue)
+   * @param number the maximum number of messages to extract.
    * @return a List containing number changes extracted from the queue.
    */
   private List<UpdateMsg> getChanges(int number)
@@ -281,50 +277,17 @@
   public ReplicationIterator generateIterator(ChangeNumber changeNumber)
                            throws DatabaseException, Exception
   {
-    /*
-     * When we create an iterator we need to make sure that we
-     * don't miss some changes because the iterator is created
-     * close to the limit of the changed that have not yet been
-     * flushed to the database.
-     * We detect this by comparing the date of the changeNumber where
-     * we want to start with the date of the first ChangeNumber
-     * of the msgQueue.
-     * If this is the case we flush the queue to the database.
-     */
-    ChangeNumber recentChangeNumber = null;
-
     if (changeNumber == null)
     {
       flush();
     }
-    synchronized (msgQueue)
-    {
-      try
-      {
-        UpdateMsg msg = msgQueue.getLast();
-        recentChangeNumber = msg.getChangeNumber();
-      }
-      catch (NoSuchElementException e)
-      {}
-    }
-
-    if ( (recentChangeNumber != null) && (changeNumber != null))
-    {
-      if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
-         ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
-      {
-        flush();
-      }
-    }
-
     ReplicationIterator it =
-      new ReplicationIterator(serverId, db, changeNumber);
-
+      new ReplicationIterator(serverId, db, changeNumber, this);
     return it;
   }
 
   /**
-   * Removes message in a subList of the msgQueue from the msgQueue.
+   * Removes the provided number of messages from the beginning of the msgQueue.
    *
    * @param number the number of changes to be removed.
    */
@@ -335,7 +298,7 @@
       int current = 0;
       while ((current < number) && (!msgQueue.isEmpty()))
       {
-        UpdateMsg msg = msgQueue.remove();
+        UpdateMsg msg = msgQueue.remove(); // remove first
         queueByteSize -= msg.size();
         current++;
       }
@@ -508,8 +471,10 @@
 
   /**
    * Flush a number of updates from the memory list to the stable storage.
+   * Flush is done by chunk sized to 500 messages, starting from the
+   * beginning of the list.
    */
-  private void flush()
+  public void flush()
   {
     int size;
     int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
@@ -518,7 +483,8 @@
     {
       synchronized(flushLock)
       {
-        // get N messages to save in the DB
+        // get N (or less) messages from the queue to save to the DB
+        // (from the beginning of the queue)
         List<UpdateMsg> changes = getChanges(chunksize);
 
         // if no more changes to save exit immediately.
@@ -528,7 +494,8 @@
         // save the change to the stable storage.
         db.addEntries(changes);
 
-        // remove the changes from the list of changes to be saved.
+        // remove the changes from the list of changes to be saved
+        // (remove from the beginning of the queue)
         clearQueue(changes.size());
       }
     } while (size >= chunksize);
@@ -668,4 +635,14 @@
   {
     return this.serverId;
   }
+
+  /**
+   * Return the size of the msgQueue (the memory cache of the DbHandler).
+   * For test purpose.
+   * @return The memory queue size.
+   */
+  public int getQueueSize()
+  {
+    return this.msgQueue.size();
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index 6e0079b..795f7cf 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -40,6 +40,9 @@
 {
   private UpdateMsg currentChange = null;
   private ReplServerDBCursor cursor = null;
+  private DbHandler dbh;
+  private ReplicationDB db;
+  ChangeNumber lastNonNullCurrentCN;
 
   /**
    * Creates a new ReplicationIterator.
@@ -49,18 +52,40 @@
    * @param id the Identifier of the server on which the iterator applies.
    * @param db The db where the iterator must be created.
    * @param changeNumber The ChangeNumber after which the iterator must start.
+   * @param dbh The associated DbHandler.
    * @throws Exception If there is no other change to push after change
    *         with changeNumber number.
    * @throws DatabaseException if a database problem happened.
    */
   public ReplicationIterator(
-          short id, ReplicationDB db, ChangeNumber changeNumber)
+          short id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
           throws Exception, DatabaseException
   {
-    cursor = db.openReadCursor(changeNumber);
+    this.db = db;
+    this.dbh = dbh;
+    this.lastNonNullCurrentCN = changeNumber;
+
+    try
+    {
+      cursor = db.openReadCursor(changeNumber);
+    }
+    catch(Exception e)
+    {
+      // we didn't find it in the db
+      cursor = null;
+    }
+
     if (cursor == null)
     {
-      throw new Exception("no new change");
+      // flush the queue into the db
+      dbh.flush();
+
+      // look again in the db
+      cursor = db.openReadCursor(changeNumber);
+      if (cursor == null)
+      {
+        throw new Exception("no new change");
+      }
     }
   }
 
@@ -80,18 +105,47 @@
    */
   public boolean next()
   {
-    currentChange = cursor.next();
+    boolean hasNext = false;
+
+    currentChange = cursor.next(); // can return null
 
     if (currentChange != null)
-      return true;
+    {
+      lastNonNullCurrentCN = currentChange.getChangeNumber();
+      hasNext = true;
+    }
     else
     {
-      // TODO : should check here if some changes are still in the
-      // dbHandler message queue and not yet saved to the backing database
-      // if yes should get change from there from now on.
-      return false;
+      synchronized (this)
+      {
+        if (cursor != null)
+        {
+          cursor.close();
+          cursor = null;
+        }
+        dbh.flush();
+        try
+        {
+          cursor = db.openReadCursor(lastNonNullCurrentCN);
+          currentChange = cursor.next(); // can return null
+          lastNonNullCurrentCN = currentChange.getChangeNumber();
+          if (currentChange != null)
+          {
+            hasNext = true;
+          }
+          else
+          {
+            hasNext = false;
+          }
+        }
+        catch(Exception e)
+        {
+          currentChange = null;
+          hasNext = false;
+        }
+      }
     }
-
+    return hasNext;
   }
 
   /**
@@ -108,6 +162,8 @@
         cursor.close();
         cursor = null;
       }
+      this.dbh = null;
+      this.db = null;
     }
   }
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index ff51863..b84564e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -50,6 +50,7 @@
     ReplicationServer replicationServer = null;
     ReplicationDbEnv dbEnv = null;
     DbHandler handler = null;
+    ReplicationIterator it = null;
     try
     {
       TestCaseUtils.startServer();
@@ -85,22 +86,145 @@
       ChangeNumber changeNumber1 = gen.newChangeNumber();
       ChangeNumber changeNumber2 = gen.newChangeNumber();
       ChangeNumber changeNumber3 = gen.newChangeNumber();
+      ChangeNumber changeNumber4 = gen.newChangeNumber();
+      ChangeNumber changeNumber5 = gen.newChangeNumber();
 
       DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
         "uid");
       DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
         "uid");
       DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
-        "uid");
+      "uid");
+      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4,
+      "uid");
 
       handler.add(update1);
       handler.add(update2);
       handler.add(update3);
 
-      // The ChangeNumber should not get purged
+      //--
+      // Iterator tests with memory queue only populated
+
+      // verify that memory queue is populated
+      assertEquals(handler.getQueueSize(),3);
+
+      // Iterator from existing CN
+      it = handler.generateIterator(changeNumber1);
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
+          " Actual change number=" + it.getChange().getChangeNumber() +
+          " Expect change number=" + changeNumber2);
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertFalse(it.next());
+      it.releaseCursor();
+      it=null;
+
+      // Iterator from NON existing CN
+      Exception ec = null;
+      try
+      {
+        it = handler.generateIterator(changeNumber5);
+      }
+      catch(Exception e)
+      {
+        ec = e;
+      }
+      assertNotNull(ec);
+      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+      
+      //--
+      // Iterator tests with db only populated
+      Thread.sleep(1000); // let the time for flush to happen
+
+      // verify that memory queue is empty (all changes flushed in the db)
+      assertEquals(handler.getQueueSize(),0);
+
+      // Test iterator from existing CN
+      it = handler.generateIterator(changeNumber1);
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertFalse(it.next());
+      it.releaseCursor();
+      it=null;
+
+      // Iterator from NON existing CN
+      ec = null;
+      try
+      {
+        it = handler.generateIterator(changeNumber5);
+      }
+      catch(Exception e)
+      {
+        ec = e;
+      }
+      assertNotNull(ec);
+      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+      
+      // Test first and last
       assertEquals(changeNumber1, handler.getFirstChange());
       assertEquals(changeNumber3, handler.getLastChange());
 
+      //--
+      // Iterator tests with db and memory queue populated
+      // all changes in the db - add one in the memory queue
+      handler.add(update4);
+
+      // verify memory queue contains this one
+      assertEquals(handler.getQueueSize(),1);
+
+      // Test iterator from existing CN
+      it = handler.generateIterator(changeNumber1);
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertFalse(it.next());
+      assertTrue(it.getChange()==null);
+      it.releaseCursor();
+      it=null;
+
+      // Test iterator from existing CN at the limit between queue and db
+      it = handler.generateIterator(changeNumber3);
+      assertTrue(it.next());
+      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
+          " Actual change number=" + it.getChange().getChangeNumber());
+      assertFalse(it.next());
+      assertTrue(it.getChange()==null);
+      it.releaseCursor();
+      it=null;
+
+      // Test iterator from existing CN at the limit between queue and db
+      it = handler.generateIterator(changeNumber4);
+      assertFalse(it.next());
+      assertTrue(it.getChange()==null,
+          " Actual change number=" + it.getChange());
+      it.releaseCursor();
+      it=null;
+
+      // Test iterator from NON existing CN
+      ec = null;
+      try
+      {
+        it = handler.generateIterator(changeNumber5);
+      }
+      catch(Exception e)
+      {
+        ec = e;
+      }
+      assertNotNull(ec);
+      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+      
       handler.setPurgeDelay(1);
 
       boolean purged = false;
@@ -109,8 +233,8 @@
       {
         ChangeNumber firstChange = handler.getFirstChange();
         ChangeNumber lastChange = handler.getLastChange();
-        if ((!firstChange.equals(changeNumber3) ||
-          (!lastChange.equals(changeNumber3))))
+        if ((!firstChange.equals(changeNumber4) ||
+          (!lastChange.equals(changeNumber4))))
         {
           TestCaseUtils.sleep(100);
         } else
@@ -120,6 +244,11 @@
       }
     } finally
     {
+      if (it != null)
+      {
+        it.releaseCursor();
+        it=null;
+      }
       if (handler != null)
         handler.shutdown();
       if (dbEnv != null)

--
Gitblit v1.10.0