mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.20.2015 e4f80a3325b8ca06ad3823113b2e30054e471062
OPENDJ-2106 (CR-7228) Entry is not replicated on second instance in 2 RS topology

A bit of context:
This code is executed by the RS which sends changes to a connected DS.

Fixed the logic in MessageHandler.fillLateQueue() which seemed flawed and even dangerous (because the connected DSs collection changes dynamically with the topology, and DSs can change their preferred RS). This is inherited from the 2.6 code worked: it did not have a global view of the changelog and specifically opened cursors for the connected DSs.
Given a replication change, the only DS we do not want to send it to is the originating DS itself.


MessageHandler.java:
In getNextMessage(), replaced the parameter Set<Integer> connectedReplicaIds with int sendToServerId and removed synchronous parameter (useless).

ServerHandler.java:
In take(), removed "Set<Integer> connectedReplicaIds" parameter.
Extracted methods acquirePermitInSendWindow() and incrementAssuredStats().
Inlined incrementAssuredSdSentUpdates() and incrementAssuredSrSentUpdates().

ReplicationServerDomain.java, ServerWriter.java:
Inlined ReplicationServerDomain.take().
4 files modified
115 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java 28 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java 16 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java 67 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -31,7 +31,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
@@ -234,19 +233,14 @@
  }
  /**
   * Get the next update that must be sent to the consumer
   * from the message queue or from the database.
   * Get the next update that must be sent to the consumer from the message queue or from the
   * database.
   *
   * @param connectedReplicaIds
   *            Ids of replicas to accept when returning a message from
   *            the late queue.
   * @param synchronous specifies what to do when the queue is empty.
   *         when true, the method blocks; when false the method return null.
   *
   * @return The next update that must be sent to the consumer.
   *         null when synchronous is false and queue is empty.
   * @param sendToServerId
   *          serverId of the replica where changes will be sent
   * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
   */
  protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
  protected UpdateMsg getNextMessage(int sendToServerId)
  {
    while (activeConsumer)
    {
@@ -279,7 +273,7 @@
           *           restart as usual
           *   load this change on the delayList
           */
          fillLateQueue(connectedReplicaIds);
          fillLateQueue(sendToServerId);
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
@@ -342,10 +336,6 @@
          {
            while (msgQueue.isEmpty() && following)
            {
              if (!synchronous)
              {
                return null;
              }
              msgQueue.wait(500);
              if (!activeConsumer)
              {
@@ -381,14 +371,14 @@
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue(Set<Integer> connectedReplicaIds)
  private void fillLateQueue(int sendToServerId)
  {
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        final UpdateMsg record = cursor.getRecord();
        if (connectedReplicaIds.contains(record.getCSN().getServerId()))
        if (record.getCSN().getServerId() != sendToServerId)
        {
          lateQueue.add(record);
        }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,7 +31,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -1265,21 +1264,6 @@
  }
  /**
   * Get the next update that need to be sent to a given LDAP server.
   * This call is blocking when no update is available or when dependencies
   * do not allow to send the next available change
   *
   * @param sHandler The server handler for the target directory server.
   *
   * @return the update that must be forwarded
   */
  public UpdateMsg take(ServerHandler sHandler)
  {
    // Next message can only be taken from connected DSs
    return sHandler.take(new HashSet<Integer>(getConnectedDSs().keySet()));
  }
  /**
   * Creates and returns a cursor across this replication domain.
   * <p>
   * Client code must call {@link DBCursor#next()} to advance the cursor to the
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -31,7 +31,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -603,15 +602,6 @@
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode.
   */
  private void incrementAssuredSdSentUpdates()
  {
    assuredSdSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode that timed out.
   */
  public void incrementAssuredSdSentUpdatesTimeout()
@@ -639,15 +629,6 @@
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode.
   */
  private void incrementAssuredSrSentUpdates()
  {
    assuredSrSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode that timed out.
   */
  public void incrementAssuredSrSentUpdatesTimeout()
@@ -942,17 +923,31 @@
   * Select the next update that must be sent to the server managed by this
   * ServerHandler.
   *
   * @param connectedReplicaIds
   *          Ids of replicas to accept when returning a message.
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   */
  public UpdateMsg take(Set<Integer> connectedReplicaIds)
  public UpdateMsg take()
  {
    boolean interrupted = true;
    UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg
    final UpdateMsg msg = getNextMessage(serverId);
    acquirePermitInSendWindow();
    if (msg != null)
    {
      incrementOutCount();
      if (msg.isAssured())
      {
        incrementAssuredStats(msg);
      }
      return msg;
    }
    return null;
  }
  private void acquirePermitInSendWindow()
  {
    boolean acquired = false;
    boolean interrupted = true;
    do
    {
      try
@@ -964,22 +959,18 @@
        // loop until not interrupted
      }
    } while ((interrupted || !acquired) && !shutdownWriter);
    if (msg != null)
    {
      incrementOutCount();
  }
      if (msg.isAssured())
      {
        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
        {
          incrementAssuredSrSentUpdates();
        } else if (!isDataServer())
        {
          incrementAssuredSdSentUpdates();
        }
      }
  private void incrementAssuredStats(final UpdateMsg msg)
  {
    if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
    {
      assuredSrSentUpdates++;
    }
    return msg;
    else if (!isDataServer())
    {
      assuredSdSentUpdates++;
    }
  }
  /**
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2014 ForgeRock AS
 *      Portions Copyright 2011-2015 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -101,7 +101,7 @@
      while (!shutdown
          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
      {
        final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
        final UpdateMsg updateMsg = this.handler.take();
        if (updateMsg == null)
        {
          // this connection is closing