mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.40.2008 66f48672bc7953e77364a9a7ae41f1e70d83534f
Fix for issue 3477 : OpenDS runs out of synchronization or crashes

This problem happens because the cache of messages kept by the
Replication Servers are limited only by the a number of messages.

In the particular case of this tests, the entries used are very large
and modifications are done using large attribute values.
This cause the Replication Server caches to become too large and
the JVM runs out of memory.

The solution is to account not only for the number of messages but
also for the total number of bytes in the queues and to stop
caching messages in the queue when the maximum size is reached.
11 files modified
193 ■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/AddMsg.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 57 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MsgQueue.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 4 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 43 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -297,4 +297,13 @@
  {
    return parentUniqueId;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    return encodedAttributes.length + 40;
  }
}
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -122,4 +122,15 @@
  {
    return ("DEL " + getDn() + " " + getChangeNumber());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    // The DeleteMsg size is mostly dependent on the DN and should never
    // grow very large. It is therefore safe to assume an average of 40 bytes.
    return 40;
  }
}
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -360,4 +360,15 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    // The MODDN message size are mainly dependent on the
    // size of the DN. let's assume that they average on 100 bytes
    return 100;
  }
}
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -235,4 +235,16 @@
  {
    return("MOD " + getDn() + " " + getChangeNumber());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int size()
  {
    // The ModifyMsh can be very large when added or deleted attribute
    // values are very large. We therefore need to count the
    // whole encoded msg.
    return encodedMsg.length;
  }
}
opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -362,4 +362,11 @@
    }
  }
  /**
   * Return the number of bytes used by this message.
   *
   * @return The number of bytes used by this message.
   */
  public abstract int size();
}
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@
  //
  private final LinkedList<UpdateMessage> msgQueue =
    new LinkedList<UpdateMessage>();
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueHimark = 5000;
  int queueLowmark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueHimarkBytes = 100 * queueHimark;
  int queueLowmarkBytes = 100 * queueLowmark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
  private ChangeNumber lastChange = null;
@@ -93,14 +109,6 @@
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  MSG_QUEUE_HIMARK and will resume
  // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
  final static int MSG_QUEUE_HIMARK = 5000;
  final static int MSG_QUEUE_LOWMARK = 4000;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
@@ -120,16 +128,22 @@
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @param queueSize The queueSize to use when creating the dbHandler.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
  public DbHandler(
      short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, int queueSize)
         throws DatabaseException
  {
    this.replicationServer = replicationServer;
    this.serverId = id;
    serverId = id;
    this.baseDn = baseDn;
    this.trimage = replicationServer.getTrimage();
    trimage = replicationServer.getTrimage();
    queueHimark = queueSize;
    queueLowmark = queueSize * 4 / 5;
    queueHimarkBytes = 100 * queueHimark;
    queueLowmarkBytes = 100 * queueLowmark;
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
@@ -156,7 +170,7 @@
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      while (size > MSG_QUEUE_HIMARK)
      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
      {
        try
        {
@@ -168,6 +182,7 @@
        size = msgQueue.size();
      }
      queueByteSize += update.size();
      msgQueue.add(update);
      if (lastChange == null || lastChange.older(update.getChangeNumber()))
      {
@@ -308,10 +323,12 @@
      int current = 0;
      while ((current < number) && (!msgQueue.isEmpty()))
      {
        msgQueue.remove();
        UpdateMessage msg = msgQueue.remove();
        queueByteSize -= msg.size();
        current++;
      }
      if (msgQueue.size() < MSG_QUEUE_LOWMARK)
      if ((msgQueue.size() < queueLowmark) &&
          (queueByteSize < queueLowmarkBytes))
        msgQueue.notify();
    }
  }
@@ -475,13 +492,14 @@
  private void flush()
  {
    int size;
    int chunksize = (500 < queueHimark ? 500 : queueHimark);
    do
    {
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        List<UpdateMessage> changes = getChanges(500);
        List<UpdateMessage> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
        if ((changes == null) || ((size = changes.size()) == 0))
@@ -493,7 +511,7 @@
        // remove the changes from the list of changes to be saved.
        clearQueue(changes.size());
      }
    } while (size >=500);
    } while (size >= chunksize);
  }
  /**
@@ -529,6 +547,10 @@
        attributes.add(new Attribute("last-change",
            lastChange.toString() + " " + lastTime.toString()));
      }
      attributes.add(
          new Attribute("queue-size", String.valueOf(msgQueue.size())));
      attributes.add(
          new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
      return attributes;
    }
@@ -604,6 +626,7 @@
    synchronized(flushLock)
    {
      msgQueue.clear();
      queueByteSize = 0;
    }
    db.clear();
    firstChange = db.readFirstChange();
opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -42,6 +42,9 @@
  private SortedMap<ChangeNumber, UpdateMessage>  map =
    new TreeMap<ChangeNumber, UpdateMessage>();
  // The total number of bytes for all the message in the queue.
  private int bytesCount = 0;
  /**
   * Return the first UpdateMessage in the MsgQueue.
   *
@@ -67,12 +70,22 @@
   *
   * @return The number of elements in this MsgQueue.
   */
  public int size()
  public int count()
  {
    return map.size();
  }
  /**
   * Returns the number of bytes in this MsgQueue.
   *
   * @return The number of bytes in this MsgQueue.
   */
  public int bytesCount()
  {
    return bytesCount;
  }
  /**
   * Returns <tt>true</tt> if this MsgQueue contains no UpdateMessage.
   *
   * @return <tt>true</tt> if this MsgQueue contains no UpdateMessage.
@@ -91,6 +104,7 @@
  public void add(UpdateMessage update)
  {
    map.put(update.getChangeNumber(), update);
    bytesCount += update.size();
  }
  /**
@@ -102,6 +116,7 @@
  {
    UpdateMessage msg = map.get(map.firstKey());
    map.remove(msg.getChangeNumber());
    bytesCount -= msg.size();
    return msg;
  }
@@ -126,5 +141,6 @@
  public void clear()
  {
    map.clear();
    bytesCount = 0;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -257,7 +257,9 @@
              + " serverId=" + serverId);
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this);
            new DbHandler(
                serverId, baseDn, replicationServer, this,
                replicationServer.getQueueSize());
          replicationServer.getReplicationServerDomain(baseDn, true).
          setDbHandler(serverId, dbHandler);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -134,7 +134,7 @@
  // ID of the backend
  private static final String backendId = "replicationChanges";
  // At startup, the listen thread wait on this flag for the connet
  // At startup, the listen thread wait on this flag for the connect
  // thread to look for other servers in the topology.
  private boolean connectedInTopology = false;
  private final Object connectedInTopologyLock = new Object();
@@ -554,7 +554,7 @@
  public DbHandler newDbHandler(short id, DN baseDn)
  throws DatabaseException
  {
    return new DbHandler(id, baseDn, this, dbEnv);
    return new DbHandler(id, baseDn, this, dbEnv, queueSize);
  }
  /**
@@ -827,6 +827,17 @@
  }
  /**
   * Get the queueSize for this replication server.
   *
   * @return The maximum size of the queues for this Replication Server
   *
   */
  public int getQueueSize()
  {
    return queueSize;
  }
  /**
   * Creates the backend associated to this replication server.
   * @throws ConfigException
   */
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 10000;
  private int maxQueueSize = 5000;
  private int maxQueueBytesSize = maxQueueSize * 100;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
@@ -115,8 +116,8 @@
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controled and should
                                       // be stopped from sending messsages.
                                       // flow controlled and should
                                       // be stopped from sending messages.
  private int saturationCount = 0;
  private short replicationServerId;
@@ -165,6 +166,7 @@
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
    this.protocolVersion = ProtocolVersion.currentVersion();
  }
@@ -305,7 +307,7 @@
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        /* Until here session is encrypted then it depends on the negotiation */
        if (!sslEncryption)
        {
          session.stopEncryption();
@@ -352,7 +354,7 @@
        }
        else
        {
          // We are an empty Replicationserver
          // We are an empty Replication Server
          if ((generationId>0)&&(!serverState.isEmpty()))
          {
            // If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
  {
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      int size = msgQueue.count();
      if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
        return true;
@@ -764,7 +766,7 @@
  {
    synchronized (msgQueue)
    {
      int queueSize = msgQueue.size();
      int queueSize = msgQueue.count();
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.size();
       return msgQueue.count();
     else
     {
       /*
@@ -1034,7 +1036,8 @@
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while (msgQueue.size() > maxQueueSize)
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      {
        setFollowing(false);
        msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change accross all servers.
          // change across all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          while (!iteratorSortedSet.isEmpty() &&
                 (lateQueue.count()<100) &&
                 (lateQueue.bytesCount()<50000) )
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
          {
            synchronized (msgQueue)
            {
              if (msgQueue.size() < maxQueueSize)
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              {
                setFollowing(true);
              }
@@ -1203,7 +1209,7 @@
            {
              if (msgQueue.contains(msg))
              {
                /* we finally catched up with the regular queue */
                /* we finally catch up with the regular queue */
                setFollowing(true);
                lateQueue.clear();
                UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
   * Update the serverState with the last message sent.
   *
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningfull.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean  updateServerState(UpdateMessage msg)
  {
@@ -1599,6 +1605,15 @@
      }
    }
    attributes.add(
        new Attribute("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        new Attribute(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(
        new Attribute(
            "following", String.valueOf(following)));
    // Deprecated
    attributes.add(new Attribute("max-waiting-changes",
                                  String.valueOf(maxQueueSize)));
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,8 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
      new DbHandler(
          (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
    ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +154,8 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
      new DbHandler(
          (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
    // Creates changes added to the dbHandler
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);