From cd092a8cf0e5ff9b6b0df1c56e3ddfdd86a4aeaa Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 14:51:06 +0000
Subject: [PATCH] Fix failure of replication Dependency unit test by rethinking ReplicationIterator

---
 opends/src/server/org/opends/server/replication/server/DbHandler.java |   67 +++++++++++----------------------
 1 files changed, 22 insertions(+), 45 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 4ed5700..ca05642 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@
 import org.opends.messages.MessageBuilder;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
@@ -36,7 +35,6 @@
 import java.util.Date;
 import java.util.List;
 import java.util.LinkedList;
-import java.util.NoSuchElementException;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.DirectoryThread;
@@ -47,7 +45,6 @@
 import org.opends.server.types.InitializationException;
 import org.opends.server.util.TimeThread;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
@@ -123,7 +120,6 @@
    *
    */
   private long trimage;
-  private static final DebugTracer TRACER = getTracer();
 
   /**
    * Creates a new dbHandler associated to a given LDAP server.
@@ -206,8 +202,8 @@
 
   /**
    * Get some changes out of the message queue of the LDAP server.
-   *
-   * @param number the number of messages to extract.
+   * (from the beginning of the queue)
+   * @param number the maximum number of messages to extract.
    * @return a List containing number changes extracted from the queue.
    */
   private List<UpdateMsg> getChanges(int number)
@@ -281,50 +277,17 @@
   public ReplicationIterator generateIterator(ChangeNumber changeNumber)
                            throws DatabaseException, Exception
   {
-    /*
-     * When we create an iterator we need to make sure that we
-     * don't miss some changes because the iterator is created
-     * close to the limit of the changed that have not yet been
-     * flushed to the database.
-     * We detect this by comparing the date of the changeNumber where
-     * we want to start with the date of the first ChangeNumber
-     * of the msgQueue.
-     * If this is the case we flush the queue to the database.
-     */
-    ChangeNumber recentChangeNumber = null;
-
     if (changeNumber == null)
     {
       flush();
     }
-    synchronized (msgQueue)
-    {
-      try
-      {
-        UpdateMsg msg = msgQueue.getLast();
-        recentChangeNumber = msg.getChangeNumber();
-      }
-      catch (NoSuchElementException e)
-      {}
-    }
-
-    if ( (recentChangeNumber != null) && (changeNumber != null))
-    {
-      if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
-         ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
-      {
-        flush();
-      }
-    }
-
     ReplicationIterator it =
-      new ReplicationIterator(serverId, db, changeNumber);
-
+      new ReplicationIterator(serverId, db, changeNumber, this);
     return it;
   }
 
   /**
-   * Removes message in a subList of the msgQueue from the msgQueue.
+   * Removes the provided number of messages from the beginning of the msgQueue.
    *
    * @param number the number of changes to be removed.
    */
@@ -335,7 +298,7 @@
       int current = 0;
       while ((current < number) && (!msgQueue.isEmpty()))
       {
-        UpdateMsg msg = msgQueue.remove();
+        UpdateMsg msg = msgQueue.remove(); // remove first
         queueByteSize -= msg.size();
         current++;
       }
@@ -508,8 +471,10 @@
 
   /**
    * Flush a number of updates from the memory list to the stable storage.
+   * Flush is done by chunk sized to 500 messages, starting from the
+   * beginning of the list.
    */
-  private void flush()
+  public void flush()
   {
     int size;
     int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
@@ -518,7 +483,8 @@
     {
       synchronized(flushLock)
       {
-        // get N messages to save in the DB
+        // get N (or less) messages from the queue to save to the DB
+        // (from the beginning of the queue)
         List<UpdateMsg> changes = getChanges(chunksize);
 
         // if no more changes to save exit immediately.
@@ -528,7 +494,8 @@
         // save the change to the stable storage.
         db.addEntries(changes);
 
-        // remove the changes from the list of changes to be saved.
+        // remove the changes from the list of changes to be saved
+        // (remove from the beginning of the queue)
         clearQueue(changes.size());
       }
     } while (size >= chunksize);
@@ -668,4 +635,14 @@
   {
     return this.serverId;
   }
+
+  /**
+   * Return the size of the msgQueue (the memory cache of the DbHandler).
+   * For test purpose.
+   * @return The memory queue size.
+   */
+  public int getQueueSize()
+  {
+    return this.msgQueue.size();
+  }
 }

--
Gitblit v1.10.0