mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.20.2015 e4f80a3325b8ca06ad3823113b2e30054e471062
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -31,7 +31,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
@@ -234,19 +233,14 @@
  }
  /**
   * Get the next update that must be sent to the consumer
   * from the message queue or from the database.
   * Get the next update that must be sent to the consumer from the message queue or from the
   * database.
   *
   * @param connectedReplicaIds
   *            Ids of replicas to accept when returning a message from
   *            the late queue.
   * @param synchronous specifies what to do when the queue is empty.
   *         when true, the method blocks; when false the method return null.
   *
   * @return The next update that must be sent to the consumer.
   *         null when synchronous is false and queue is empty.
   * @param sendToServerId
   *          serverId of the replica where changes will be sent
   * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
   */
  protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
  protected UpdateMsg getNextMessage(int sendToServerId)
  {
    while (activeConsumer)
    {
@@ -279,7 +273,7 @@
           *           restart as usual
           *   load this change on the delayList
           */
          fillLateQueue(connectedReplicaIds);
          fillLateQueue(sendToServerId);
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
@@ -342,10 +336,6 @@
          {
            while (msgQueue.isEmpty() && following)
            {
              if (!synchronous)
              {
                return null;
              }
              msgQueue.wait(500);
              if (!activeConsumer)
              {
@@ -381,14 +371,14 @@
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue(Set<Integer> connectedReplicaIds)
  private void fillLateQueue(int sendToServerId)
  {
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        final UpdateMsg record = cursor.getRecord();
        if (connectedReplicaIds.contains(record.getCSN().getServerId()))
        if (record.getCSN().getServerId() != sendToServerId)
        {
          lateQueue.add(record);
        }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,7 +31,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -1265,21 +1264,6 @@
  }
  /**
   * Get the next update that need to be sent to a given LDAP server.
   * This call is blocking when no update is available or when dependencies
   * do not allow to send the next available change
   *
   * @param sHandler The server handler for the target directory server.
   *
   * @return the update that must be forwarded
   */
  public UpdateMsg take(ServerHandler sHandler)
  {
    // Next message can only be taken from connected DSs
    return sHandler.take(new HashSet<Integer>(getConnectedDSs().keySet()));
  }
  /**
   * Creates and returns a cursor across this replication domain.
   * <p>
   * Client code must call {@link DBCursor#next()} to advance the cursor to the
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -31,7 +31,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -603,15 +602,6 @@
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode.
   */
  private void incrementAssuredSdSentUpdates()
  {
    assuredSdSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode that timed out.
   */
  public void incrementAssuredSdSentUpdatesTimeout()
@@ -639,15 +629,6 @@
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode.
   */
  private void incrementAssuredSrSentUpdates()
  {
    assuredSrSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode that timed out.
   */
  public void incrementAssuredSrSentUpdatesTimeout()
@@ -942,17 +923,31 @@
   * Select the next update that must be sent to the server managed by this
   * ServerHandler.
   *
   * @param connectedReplicaIds
   *          Ids of replicas to accept when returning a message.
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   */
  public UpdateMsg take(Set<Integer> connectedReplicaIds)
  public UpdateMsg take()
  {
    boolean interrupted = true;
    UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg
    final UpdateMsg msg = getNextMessage(serverId);
    acquirePermitInSendWindow();
    if (msg != null)
    {
      incrementOutCount();
      if (msg.isAssured())
      {
        incrementAssuredStats(msg);
      }
      return msg;
    }
    return null;
  }
  private void acquirePermitInSendWindow()
  {
    boolean acquired = false;
    boolean interrupted = true;
    do
    {
      try
@@ -964,22 +959,18 @@
        // loop until not interrupted
      }
    } while ((interrupted || !acquired) && !shutdownWriter);
    if (msg != null)
    {
      incrementOutCount();
  }
      if (msg.isAssured())
      {
        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
        {
          incrementAssuredSrSentUpdates();
        } else if (!isDataServer())
        {
          incrementAssuredSdSentUpdates();
        }
      }
  private void incrementAssuredStats(final UpdateMsg msg)
  {
    if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
    {
      assuredSrSentUpdates++;
    }
    return msg;
    else if (!isDataServer())
    {
      assuredSdSentUpdates++;
    }
  }
  /**
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2014 ForgeRock AS
 *      Portions Copyright 2011-2015 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -101,7 +101,7 @@
      while (!shutdown
          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
      {
        final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
        final UpdateMsg updateMsg = this.handler.take();
        if (updateMsg == null)
        {
          // this connection is closing