mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.40.2008 89c803ce85e8a88d54544bfe35f24c9be34758a3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -297,4 +297,13 @@
  {
    return parentUniqueId;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    return encodedAttributes.length + 40;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -122,4 +122,15 @@
  {
    return ("DEL " + getDn() + " " + getChangeNumber());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    // The DeleteMsg size is mostly dependent on the DN and should never
    // grow very large. It is therefore safe to assume an average of 40 bytes.
    return 40;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -360,4 +360,15 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    // The MODDN message size are mainly dependent on the
    // size of the DN. let's assume that they average on 100 bytes
    return 100;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -235,4 +235,16 @@
  {
    return("MOD " + getDn() + " " + getChangeNumber());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    // The ModifyMsh can be very large when added or deleted attribute
    // values are very large. We therefore need to count the
    // whole encoded msg.
    return encodedMsg.length;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -362,4 +362,11 @@
    }
  }
  /**
   * Return the number of bytes used by this message.
   *
   * @return The number of bytes used by this message.
   */
  public abstract int size();
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@
  //
  private final LinkedList<UpdateMessage> msgQueue =
    new LinkedList<UpdateMessage>();
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueHimark = 5000;
  int queueLowmark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueHimarkBytes = 100 * queueHimark;
  int queueLowmarkBytes = 100 * queueLowmark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
  private ChangeNumber lastChange = null;
@@ -93,14 +109,6 @@
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  MSG_QUEUE_HIMARK and will resume
  // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
  final static int MSG_QUEUE_HIMARK = 5000;
  final static int MSG_QUEUE_LOWMARK = 4000;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
@@ -120,16 +128,22 @@
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @param queueSize The queueSize to use when creating the dbHandler.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
  public DbHandler(
      short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, int queueSize)
         throws DatabaseException
  {
    this.replicationServer = replicationServer;
    this.serverId = id;
    serverId = id;
    this.baseDn = baseDn;
    this.trimage = replicationServer.getTrimage();
    trimage = replicationServer.getTrimage();
    queueHimark = queueSize;
    queueLowmark = queueSize * 4 / 5;
    queueHimarkBytes = 100 * queueHimark;
    queueLowmarkBytes = 100 * queueLowmark;
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
@@ -156,7 +170,7 @@
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      while (size > MSG_QUEUE_HIMARK)
      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
      {
        try
        {
@@ -168,6 +182,7 @@
        size = msgQueue.size();
      }
      queueByteSize += update.size();
      msgQueue.add(update);
      if (lastChange == null || lastChange.older(update.getChangeNumber()))
      {
@@ -308,10 +323,12 @@
      int current = 0;
      while ((current < number) && (!msgQueue.isEmpty()))
      {
        msgQueue.remove();
        UpdateMessage msg = msgQueue.remove();
        queueByteSize -= msg.size();
        current++;
      }
      if (msgQueue.size() < MSG_QUEUE_LOWMARK)
      if ((msgQueue.size() < queueLowmark) &&
          (queueByteSize < queueLowmarkBytes))
        msgQueue.notify();
    }
  }
@@ -475,13 +492,14 @@
  private void flush()
  {
    int size;
    int chunksize = (500 < queueHimark ? 500 : queueHimark);
    do
    {
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        List<UpdateMessage> changes = getChanges(500);
        List<UpdateMessage> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
        if ((changes == null) || ((size = changes.size()) == 0))
@@ -493,7 +511,7 @@
        // remove the changes from the list of changes to be saved.
        clearQueue(changes.size());
      }
    } while (size >=500);
    } while (size >= chunksize);
  }
  /**
@@ -529,6 +547,10 @@
        attributes.add(new Attribute("last-change",
            lastChange.toString() + " " + lastTime.toString()));
      }
      attributes.add(
          new Attribute("queue-size", String.valueOf(msgQueue.size())));
      attributes.add(
          new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
      return attributes;
    }
@@ -604,6 +626,7 @@
    synchronized(flushLock)
    {
      msgQueue.clear();
      queueByteSize = 0;
    }
    db.clear();
    firstChange = db.readFirstChange();
opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -42,6 +42,9 @@
  private SortedMap<ChangeNumber, UpdateMessage>  map =
    new TreeMap<ChangeNumber, UpdateMessage>();
  // The total number of bytes for all the message in the queue.
  private int bytesCount = 0;
  /**
   * Return the first UpdateMessage in the MsgQueue.
   *
@@ -67,12 +70,22 @@
   *
   * @return The number of elements in this MsgQueue.
   */
  public int size()
  public int count()
  {
    return map.size();
  }
  /**
   * Returns the number of bytes in this MsgQueue.
   *
   * @return The number of bytes in this MsgQueue.
   */
  public int bytesCount()
  {
    return bytesCount;
  }
  /**
   * Returns <tt>true</tt> if this MsgQueue contains no UpdateMessage.
   *
   * @return <tt>true</tt> if this MsgQueue contains no UpdateMessage.
@@ -91,6 +104,7 @@
  public void add(UpdateMessage update)
  {
    map.put(update.getChangeNumber(), update);
    bytesCount += update.size();
  }
  /**
@@ -102,6 +116,7 @@
  {
    UpdateMessage msg = map.get(map.firstKey());
    map.remove(msg.getChangeNumber());
    bytesCount -= msg.size();
    return msg;
  }
@@ -126,5 +141,6 @@
  public void clear()
  {
    map.clear();
    bytesCount = 0;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -257,7 +257,9 @@
              + " serverId=" + serverId);
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this);
            new DbHandler(
                serverId, baseDn, replicationServer, this,
                replicationServer.getQueueSize());
          replicationServer.getReplicationServerDomain(baseDn, true).
          setDbHandler(serverId, dbHandler);
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -134,7 +134,7 @@
  // ID of the backend
  private static final String backendId = "replicationChanges";
  // At startup, the listen thread wait on this flag for the connet
  // At startup, the listen thread wait on this flag for the connect
  // thread to look for other servers in the topology.
  private boolean connectedInTopology = false;
  private final Object connectedInTopologyLock = new Object();
@@ -554,7 +554,7 @@
  public DbHandler newDbHandler(short id, DN baseDn)
  throws DatabaseException
  {
    return new DbHandler(id, baseDn, this, dbEnv);
    return new DbHandler(id, baseDn, this, dbEnv, queueSize);
  }
  /**
@@ -827,6 +827,17 @@
  }
  /**
   * Get the queueSize for this replication server.
   *
   * @return The maximum size of the queues for this Replication Server
   *
   */
  public int getQueueSize()
  {
    return queueSize;
  }
  /**
   * Creates the backend associated to this replication server.
   * @throws ConfigException
   */
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 10000;
  private int maxQueueSize = 5000;
  private int maxQueueBytesSize = maxQueueSize * 100;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
@@ -115,8 +116,8 @@
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controled and should
                                       // be stopped from sending messsages.
                                       // flow controlled and should
                                       // be stopped from sending messages.
  private int saturationCount = 0;
  private short replicationServerId;
@@ -165,6 +166,7 @@
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
    this.protocolVersion = ProtocolVersion.currentVersion();
  }
@@ -305,7 +307,7 @@
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        /* Until here session is encrypted then it depends on the negotiation */
        if (!sslEncryption)
        {
          session.stopEncryption();
@@ -352,7 +354,7 @@
        }
        else
        {
          // We are an empty Replicationserver
          // We are an empty Replication Server
          if ((generationId>0)&&(!serverState.isEmpty()))
          {
            // If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
  {
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      int size = msgQueue.count();
      if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
        return true;
@@ -764,7 +766,7 @@
  {
    synchronized (msgQueue)
    {
      int queueSize = msgQueue.size();
      int queueSize = msgQueue.count();
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.size();
       return msgQueue.count();
     else
     {
       /*
@@ -1034,7 +1036,8 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while (msgQueue.size() > maxQueueSize)
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      {
        setFollowing(false);
        msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change accross all servers.
          // change across all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          while (!iteratorSortedSet.isEmpty() &&
                 (lateQueue.count()<100) &&
                 (lateQueue.bytesCount()<50000) )
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
          {
            synchronized (msgQueue)
            {
              if (msgQueue.size() < maxQueueSize)
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              {
                setFollowing(true);
              }
@@ -1203,7 +1209,7 @@
            {
              if (msgQueue.contains(msg))
              {
                /* we finally catched up with the regular queue */
                /* we finally catch up with the regular queue */
                setFollowing(true);
                lateQueue.clear();
                UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
   * Update the serverState with the last message sent.
   *
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningfull.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean  updateServerState(UpdateMessage msg)
  {
@@ -1599,6 +1605,15 @@
      }
    }
    attributes.add(
        new Attribute("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        new Attribute(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(
        new Attribute(
            "following", String.valueOf(following)));
    // Deprecated
    attributes.add(new Attribute("max-waiting-changes",
                                  String.valueOf(maxQueueSize)));
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,8 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
      new DbHandler(
          (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
    ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +154,8 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
      new DbHandler(
          (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
    // Creates changes added to the dbHandler
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);