mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -65,11 +65,19 @@
 */
public class DbHandler implements Runnable
{
  // This queue hold all the updates not yet saved to stable storage
  // it is only used as a temporary placeholder so that the write
  // The msgQueue holds all the updates not yet saved to stable storage.
  // This list is only used as a temporary placeholder so that the write
  // in the stable storage can be grouped for efficiency reason.
  // it is never read back by replicationServer threads that are responsible
  // Adding an update synchronously add the update to this list.
  // A dedicated thread loops on flush() and trim().
  // flush() : get a number of changes from the in memory list by block
  //           and write them to the db.
  // trim()  : deletes from the DB a number of changes that are older than a
  //           certain date.
  //
  // Changes are not read back by replicationServer threads that are responsible
  // for pushing the changes to other replication server or to LDAP server
  //
  private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>();
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
@@ -81,6 +89,8 @@
  private boolean done = false;
  private DirectoryThread thread = null;
  private Object flushLock = new Object();
  private ReplicationDbEnv dbenv;
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
@@ -90,23 +100,29 @@
  final static int MSG_QUEUE_LOWMARK = 4000;
  /**
   * the trim age in milliseconds.
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimage;
  /**
   * Creates a New dbHandler associated to a given LDAP server.
   * Creates a new dbHandler associated to a given LDAP server.
   *
   * @param id Identifier of the DB.
   * @param baseDn the baseDn for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @param generationId The generationId of the data contained in the LDAP
   * server for this domain.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
      ReplicationDbEnv dbenv, long generationId)
         throws DatabaseException
  {
    this.dbenv = dbenv;
    this.serverId = id;
    this.baseDn = baseDn;
    this.trimage = replicationServer.getTrimage();
@@ -261,7 +277,7 @@
   *
   * @param number the number of changes to be removed.
   */
  private void clear(int number)
  private void clearQueue(int number)
  {
    synchronized (msgQueue)
    {
@@ -346,7 +362,7 @@
  }
  /**
   * Flush old change information from this replicationServer database.
   * Trim old changes from this replicationServer database.
   * @throws DatabaseException In case of database problem.
   */
  private void trim() throws DatabaseException, Exception
@@ -417,7 +433,7 @@
        db.addEntries(changes);
        // remove the changes from the list of changes to be saved.
        clear(changes.size());
        clearQueue(changes.size());
      }
    } while (size >=500);
  }
@@ -517,4 +533,22 @@
    trimage = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   * @throws DatabaseException When an exception occurs while removing the
   * changes from the DB.
   * @throws Exception When an exception occurs while accessing a resource
   * from the DB.
   *
   */
  public void clear() throws DatabaseException, Exception
  {
    synchronized(flushLock)
    {
      msgQueue.clear();
    }
    db.clear();
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
  }
}