mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.43.2013 d1329a75c16c13463812c6a65c2ce77450f94b88
DbHandler.java, ReplicationDB.java:
Javadoc / code cleanups.
2 files modified
182 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java 103 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java 79 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -26,29 +26,29 @@
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.List;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -58,45 +58,54 @@
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * This class is also able to generate a ReplicationIterator that can be
 * used to read all changes from a given ChangeNUmber.
 * used to read all changes from a given ChangeNumber.
 *
 * This class publish some monitoring information below cn=monitor.
 *
 */
public class DbHandler implements Runnable
{
  // The msgQueue holds all the updates not yet saved to stable storage.
  // This list is only used as a temporary placeholder so that the write
  // in the stable storage can be grouped for efficiency reason.
  // Adding an update synchronously add the update to this list.
  // A dedicated thread loops on flush() and trim().
  // flush() : get a number of changes from the in memory list by block
  //           and write them to the db.
  // trim()  : deletes from the DB a number of changes that are older than a
  //           certain date.
  //
  // Changes are not read back by replicationServer threads that are responsible
  // for pushing the changes to other replication server or to LDAP server
  //
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This list is only used as a temporary placeholder so that the write in the
   * stable storage can be grouped for efficiency reason. Adding an update
   * synchronously add the update to this list. A dedicated thread loops on
   * flush() and trim().
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
   * to the db.</dd>
   * <dt>trim()</dt>
   * <dd>deletes from the DB a number of changes that are older than a certain
   * date.</dd>
   * </dl>
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedList<UpdateMsg> msgQueue =
    new LinkedList<UpdateMsg>();
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueMaxSize = 5000;
  int queueLowmark = 1000;
  int queueHimark = 4000;
  /**
   * The High and low water mark for the max size of the msgQueue. The threads
   * calling add() method will be blocked if the size of msgQueue becomes larger
   * than the queueHimark and will resume only when the size of the msgQueue
   * goes below queueLowmark.
   */
  private int queueMaxSize = 5000;
  private int queueLowmark = 1000;
  private int queueHimark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueMaxBytes = 100 * queueMaxSize;
  int queueLowmarkBytes = 100 * queueLowmark;
  int queueHimarkBytes = 100 * queueHimark;
  /**
   * The queue himark and lowmark in bytes, this is set to 100 times the himark
   * and lowmark in number of updates.
   */
  private int queueMaxBytes = 100 * queueMaxSize;
  private int queueLowmarkBytes = 100 * queueLowmark;
  private int queueHimarkBytes = 100 * queueHimark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
  /** The number of bytes currently in the queue */
  private int queueByteSize = 0;
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
@@ -113,10 +122,8 @@
  private long latestTrimDate = 0;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimAge;
@@ -195,7 +202,9 @@
        lastChange = update.getChangeNumber();
      }
      if (firstChange == null)
      {
        firstChange = update.getChangeNumber();
      }
    }
  }
@@ -299,9 +308,11 @@
        queueByteSize -= msg.size();
        current++;
      }
      if ((msgQueue.size() < queueLowmark) &&
          (queueByteSize < queueLowmarkBytes))
      if ((msgQueue.size() < queueLowmark)
          && (queueByteSize < queueLowmarkBytes))
      {
        msgQueue.notifyAll();
      }
    }
  }
@@ -334,7 +345,9 @@
    }
    while (msgQueue.size() != 0)
    {
      flush();
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -345,6 +358,7 @@
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   */
  @Override
  public void run()
  {
    while (!shutdown)
@@ -650,16 +664,15 @@
   */
  public int getCount(ChangeNumber from, ChangeNumber to)
  {
    int c=0;
    // Now that we always keep the last ChangeNumber in the DB to avoid
    // expiring cookies to quickly, we need to check if the "to"
    // expiring cookies too quickly, we need to check if the "to"
    // is older than the trim date.
    if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0)))
    {
      flush();
      c = db.count(from, to);
      return db.count(from, to);
    }
    return c;
    return 0;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -27,21 +27,19 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.getBytes;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.server.util.StaticUtils;
import com.sleepycat.je.*;
@@ -58,8 +56,10 @@
  private int serverId;
  private String baseDn;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  /**
   * The lock used to provide exclusive access to the thread that close the db
   * (shutdown or clear).
   */
  private ReentrantReadWriteLock dbCloseLock;
  // Change counter management
@@ -85,20 +85,22 @@
  //     since the previous counter record
  // 2/- the change to be stored has a new timestamp - so that the counter
  //     record is the first record for this timestamp.
  //
  /** Current value of the counter. */
  private int  counterCurrValue = 1;
  // Current value of the counter.
  /**
   * When not null, the next change with a ts different from
   * tsForNewCounterRecord will lead to store a new counterRecord.
   */
  private long counterTsLimit = 0;
  // When not null,
  // the next change with a ts different from tsForNewCounterRecord will lead
  // to store a new counterRecord.
  /**
   * The counter record will never be written to the db more often than each
   * counterWindowSize changes.
   */
  private int  counterWindowSize = 1000;
  // The counter record will never be written to the db more often than each
  // counterWindowSize changes.
 /**
   * Creates a new database or open existing database that will be used
@@ -126,7 +128,6 @@
    dbCloseLock = new ReentrantReadWriteLock(true);
    //
    Cursor cursor;
    Transaction txn = null;
    DatabaseEntry key = new DatabaseEntry();
@@ -292,17 +293,7 @@
  {
    try
    {
      if (cursor != null)
      {
        try
        {
          cursor.close();
        }
        catch (DatabaseException e)
        {
          // Ignore.
        }
      }
      StaticUtils.close(cursor);
    }
    finally
    {
@@ -458,9 +449,10 @@
   */
  public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber)
  {
    if (changeNumber == null)
    {
      return null;
    }
    Cursor cursor = null;
    ChangeNumber cn = null;
@@ -565,11 +557,15 @@
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  public class ReplServerDBCursor
  public class ReplServerDBCursor implements Closeable
  {
     // The transaction that will protect the actions done with the cursor
    // Will be let null for a read cursor
    // Will be set non null for a write cursor
    /**
     * The transaction that will protect the actions done with the cursor
     * <p>
     * Will be let null for a read cursor
     * <p>
     * Will be set non null for a write cursor
     */
    private final Transaction txn;
    private final Cursor cursor;
    private final DatabaseEntry key;
@@ -706,6 +702,7 @@
    /**
     * Close the ReplicationServer Cursor.
     */
    @Override
    public void close()
    {
      synchronized (this)
@@ -1111,10 +1108,10 @@
    this.counterWindowSize = size;
  }
  // Returns {@code true} if the DB is closed. This method assumes that either
  // the db read/write lock has been taken.
  /**
   * Returns {@code true} if the DB is closed. This method assumes that either
   * the db read/write lock has been taken.
   */
  private boolean isDBClosed()
  {
    return db == null;