mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
28.11.2014 030cd9b2dbc0d7016114f75e8b93190e34d1e213
OPENDJ-1611 CR-5492 Filter replicaIds when filling late queue in MessageHandler
3 files modified
49 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/MessageHandler.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 19 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
@@ -255,12 +256,16 @@
   * Get the next update that must be sent to the consumer
   * from the message queue or from the database.
   *
   * @param  synchronous specifies what to do when the queue is empty.
   * @param connectedReplicaIds
   *            Ids of replicas to accept when returning a message from
   *            the late queue.
   * @param synchronous specifies what to do when the queue is empty.
   *         when true, the method blocks; when false the method return null.
   *
   * @return The next update that must be sent to the consumer.
   *         null when synchronous is false and queue is empty.
   */
  protected UpdateMsg getNextMessage(boolean synchronous)
  protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
  {
    while (activeConsumer)
    {
@@ -293,7 +298,7 @@
           *           restart as usual
           *   load this change on the delayList
           */
          fillLateQueue();
          fillLateQueue(connectedReplicaIds);
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
@@ -387,7 +392,11 @@
    return null;
  }
  private void fillLateQueue()
  /**
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue(Set<Integer> connectedReplicaIds)
  {
    DBCursor<UpdateMsg> cursor = null;
    try
@@ -395,7 +404,11 @@
      cursor = replicationServerDomain.getCursorFrom(serverState);
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        lateQueue.add(cursor.getRecord());
        final UpdateMsg record = cursor.getRecord();
        if (connectedReplicaIds.contains(record.getCSN().getServerId()))
        {
          lateQueue.add(record);
        }
      }
    }
    catch (ChangelogException e)
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,9 +31,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@@ -1291,15 +1293,14 @@
   */
  public UpdateMsg take(ServerHandler sHandler)
  {
    /*
     * Get the balanced tree that we use to sort the changes to be
     * sent to the replica from the cookie
     *
     * The next change to send is always the first one in the tree
     * So this methods simply need to check that dependencies are OK
     * and update this replicaId RUV
     */
    return sHandler.take();
    // Next message can only be taken from connected DSs
    final Set<Integer> connectedReplicaIds = new HashSet<Integer>(getConnectedDSs().keySet());
    if (sHandler.isDataServer())
    {
      // Prevents sending to a DS its own messages
      connectedReplicaIds.remove(sHandler.getServerId());
    }
    return sHandler.take(connectedReplicaIds);
  }
  /**
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -960,13 +961,15 @@
   * Select the next update that must be sent to the server managed by this
   * ServerHandler.
   *
   * @param connectedReplicaIds
   *          Ids of replicas to accept when returning a message.
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   */
  public UpdateMsg take()
  public UpdateMsg take(Set<Integer> connectedReplicaIds)
  {
    boolean interrupted = true;
    UpdateMsg msg = getNextMessage(true); // synchronous:block until msg
    UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg
    boolean acquired = false;
    do