mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
24.51.2009 5cbf2ca5558c4e978745f7d8b9197ba978faf1cb
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,7 +35,6 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -47,7 +45,6 @@
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
@@ -123,7 +120,6 @@
   *
   */
  private long trimage;
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -206,8 +202,8 @@
  /**
   * Get some changes out of the message queue of the LDAP server.
   *
   * @param number the number of messages to extract.
   * (from the beginning of the queue)
   * @param number the maximum number of messages to extract.
   * @return a List containing number changes extracted from the queue.
   */
  private List<UpdateMsg> getChanges(int number)
@@ -281,50 +277,17 @@
  public ReplicationIterator generateIterator(ChangeNumber changeNumber)
                           throws DatabaseException, Exception
  {
    /*
     * When we create an iterator we need to make sure that we
     * don't miss some changes because the iterator is created
     * close to the limit of the changed that have not yet been
     * flushed to the database.
     * We detect this by comparing the date of the changeNumber where
     * we want to start with the date of the first ChangeNumber
     * of the msgQueue.
     * If this is the case we flush the queue to the database.
     */
    ChangeNumber recentChangeNumber = null;
    if (changeNumber == null)
    {
      flush();
    }
    synchronized (msgQueue)
    {
      try
      {
        UpdateMsg msg = msgQueue.getLast();
        recentChangeNumber = msg.getChangeNumber();
      }
      catch (NoSuchElementException e)
      {}
    }
    if ( (recentChangeNumber != null) && (changeNumber != null))
    {
      if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
         ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
      {
        flush();
      }
    }
    ReplicationIterator it =
      new ReplicationIterator(serverId, db, changeNumber);
      new ReplicationIterator(serverId, db, changeNumber, this);
    return it;
  }
  /**
   * Removes message in a subList of the msgQueue from the msgQueue.
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
   */
@@ -335,7 +298,7 @@
      int current = 0;
      while ((current < number) && (!msgQueue.isEmpty()))
      {
        UpdateMsg msg = msgQueue.remove();
        UpdateMsg msg = msgQueue.remove(); // remove first
        queueByteSize -= msg.size();
        current++;
      }
@@ -508,8 +471,10 @@
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * Flush is done by chunk sized to 500 messages, starting from the
   * beginning of the list.
   */
  private void flush()
  public void flush()
  {
    int size;
    int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
@@ -518,7 +483,8 @@
    {
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        // get N (or less) messages from the queue to save to the DB
        // (from the beginning of the queue)
        List<UpdateMsg> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
@@ -528,7 +494,8 @@
        // save the change to the stable storage.
        db.addEntries(changes);
        // remove the changes from the list of changes to be saved.
        // remove the changes from the list of changes to be saved
        // (remove from the beginning of the queue)
        clearQueue(changes.size());
      }
    } while (size >= chunksize);
@@ -668,4 +635,14 @@
  {
    return this.serverId;
  }
  /**
   * Return the size of the msgQueue (the memory cache of the DbHandler).
   * For test purpose.
   * @return The memory queue size.
   */
  public int getQueueSize()
  {
    return this.msgQueue.size();
  }
}