mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 f73b655466092169abac34833fb628fce1fcdebe
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -42,12 +42,13 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
@@ -79,8 +80,8 @@
  // Changes are not read back by replicationServer threads that are responsible
  // for pushing the changes to other replication server or to LDAP server
  //
  private final LinkedList<UpdateMessage> msgQueue =
    new LinkedList<UpdateMessage>();
  private final LinkedList<UpdateMsg> msgQueue =
    new LinkedList<UpdateMsg>();
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
@@ -148,7 +149,8 @@
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
    thread = new DirectoryThread(this,
                                 "Replication Server db " + id + " " +  baseDn);
      "Replication Server db for DS " + id + " and " + baseDn + " in RS " +
      replicationServer.getServerId());
    thread.start();
    DirectoryServer.deregisterMonitorProvider(
@@ -165,7 +167,7 @@
   * @param update The update that must be saved to the db managed by this db
   *               handler.
   */
  public void add(UpdateMessage update)
  public void add(UpdateMsg update)
  {
    synchronized (msgQueue)
    {
@@ -199,17 +201,17 @@
   * @param number the number of messages to extract.
   * @return a List containing number changes extracted from the queue.
   */
  private List<UpdateMessage> getChanges(int number)
  private List<UpdateMsg> getChanges(int number)
  {
    int current = 0;
    LinkedList<UpdateMessage> changes = new LinkedList<UpdateMessage>();
    LinkedList<UpdateMsg> changes = new LinkedList<UpdateMsg>();
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      while ((current < number) && (current < size))
      {
        UpdateMessage msg = msgQueue.get(current);
        UpdateMsg msg = msgQueue.get(current);
        current++;
        changes.add(msg);
      }
@@ -289,7 +291,7 @@
    {
      try
      {
        UpdateMessage msg = msgQueue.getFirst();
        UpdateMsg msg = msgQueue.getFirst();
        recentChangeNumber = msg.getChangeNumber();
      }
      catch (NoSuchElementException e)
@@ -323,7 +325,7 @@
      int current = 0;
      while ((current < number) && (!msgQueue.isEmpty()))
      {
        UpdateMessage msg = msgQueue.remove();
        UpdateMsg msg = msgQueue.remove();
        queueByteSize -= msg.size();
        current++;
      }
@@ -499,7 +501,7 @@
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        List<UpdateMessage> changes = getChanges(chunksize);
        List<UpdateMsg> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
        if ((changes == null) || ((size = changes.size()) == 0))
@@ -532,25 +534,27 @@
    public ArrayList<Attribute> getMonitorData()
    {
      ArrayList<Attribute> attributes = new ArrayList<Attribute>();
      attributes.add(new Attribute("replicationServer-database",
                                   String.valueOf(serverId)));
      attributes.add(new Attribute("base-dn", baseDn.toString()));
      attributes.add(Attributes.create("replicationServer-database",
          String.valueOf(serverId)));
      attributes.add(Attributes.create("base-dn", baseDn.toString()));
      if (firstChange != null)
      {
        Date firstTime = new Date(firstChange.getTime());
        attributes.add(new Attribute("first-change",
            firstChange.toString() + " " + firstTime.toString()));
        attributes.add(Attributes.create("first-change", firstChange
            .toString()
            + " " + firstTime.toString()));
      }
      if (lastChange != null)
      {
        Date lastTime = new Date(lastChange.getTime());
        attributes.add(new Attribute("last-change",
            lastChange.toString() + " " + lastTime.toString()));
        attributes.add(Attributes.create("last-change", lastChange
            .toString()
            + " " + lastTime.toString()));
      }
      attributes.add(
          new Attribute("queue-size", String.valueOf(msgQueue.size())));
          Attributes.create("queue-size", String.valueOf(msgQueue.size())));
      attributes.add(
          new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
          Attributes.create("queue-size-bytes", String.valueOf(queueByteSize)));
      return attributes;
    }