mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
25.02.2014 fe00a5949e3a06a8b20c5a84249594133e320226
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -50,12 +50,24 @@
 * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
 * MonitorMsg.
 */
public class MonitorMsg extends RoutableMsg
public class MonitorMsg extends ReplicationMsg
{
  /**
   * Data structure to manage the state and the approximation
   * of the data of the first missing change for each LDAP server
   * connected to a Replication Server.
   * The destination server or servers of this message.
   */
  private final int destination;
  /**
   * The serverID of the server that sends this message.
   */
  private final int senderID;
  /**
   * Data structure to manage the state and the approximation of the data of the
   * first missing change for each LDAP server connected to a Replication
   * Server.
   */
  static class ServerData
  {
@@ -89,24 +101,7 @@
   */
  public MonitorMsg(int sender, int destination)
  {
    super(sender, destination);
  }
  /**
   * Sets the sender ID.
   * @param senderID The sender ID.
   */
  public void setSenderID(int senderID)
  {
    this.senderID = senderID;
  }
  /**
   * Sets the destination.
   * @param destination The destination.
   */
  public void setDestination(int destination)
  {
    this.senderID = sender;
    this.destination = destination;
  }
@@ -459,6 +454,32 @@
    return data.rsStates.keySet().iterator();
  }
  /**
   * Get the destination.
   *
   * @return the destination
   */
  public int getDestination()
  {
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
   * @return the server id
   */
  public int getSenderID()
  {
    return senderID;
  }
  /**
   * {@inheritDoc}
   */
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -22,68 +22,91 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is part of the replication protocol.
 * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
 * informations.
 * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
 * MonitorMessage.
 * This message is part of the replication protocol. RS1 sends a
 * MonitorRequestMsg to RS2 to request its monitoring information. When RS2
 * receives a MonitorRequestMsg from RS1, RS2 responds with a MonitorMessage.
 */
public class MonitorRequestMsg extends RoutableMsg
public class MonitorRequestMsg extends ReplicationMsg
{
  /**
   * The destination server or servers of this message.
   */
  private final int destination;
  /**
   * The serverID of the server that sends this message.
   */
  private final int senderID;
  /**
   * Creates a message.
   *
   * @param serverID The sender server of this message.
   * @param destination The server or servers targeted by this message.
   * @param serverID
   *          The sender server of this message.
   * @param destination
   *          The server or servers targeted by this message.
   */
  public MonitorRequestMsg(int serverID, int destination)
  {
    super(serverID, destination);
    this.senderID = serverID;
    this.destination = destination;
  }
  /**
   * Creates a new message by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the message,
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   *
   * @param in
   *          A byte array containing the encoded information for the message,
   * @throws DataFormatException
   *           If the in does not contain a properly, encoded message.
   */
  public MonitorRequestMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderString);
      pos += length +1;
      pos += length + 1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
      pos += length + 1;
    } catch (UnsupportedEncodingException e)
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
@@ -95,8 +118,7 @@
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
@@ -117,4 +139,41 @@
      return null;
    }
  }
  /**
   * Get the destination.
   *
   * @return the destination
   */
  public int getDestination()
  {
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
   * @return the server id
   */
  public int getSenderID()
  {
    return senderID;
  }
  /**
   * Returns a string representation of the message.
   *
   * @return the string representation of this message.
   */
  public String toString()
  {
    return "[" + getClass().getCanonicalName() + " sender=" + senderID
        + " destination=" + destination + "]";
  }
}
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -22,13 +22,21 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
/**
 * This is an abstract class of messages of the replication protocol
 * for message that needs to contain information about the server that
 * send them and the destination servers to which they should be sent.
 * This is an abstract class of messages of the replication protocol for message
 * that needs to contain information about the server that send them and the
 * destination servers to which they should be sent.
 * <p>
 * Routable messages are used when initializing a new replica from an existing
 * replica: the total update messages are sent across the topology from the
 * source replica to the target replica, possibly traversing one or two
 * replication servers in the process (e.g. DS1 -&gt; RS1 -&gt; RS2 -&gt; DS2).
 */
public abstract class RoutableMsg extends ReplicationMsg
{
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -98,7 +98,6 @@
          {
            break;
          }
          monitorMsg.setDestination(serverHandler.getServerId());
          try
          {
            serverHandler.send(monitorMsg);
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -800,16 +800,6 @@
      }
    }
    // Update threshold value for status analyzers
    final int newThreshold = config.getDegradedStatusThreshold();
    if (oldConfig.getDegradedStatusThreshold() != newThreshold)
    {
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        domain.updateDegradedStatusThreshold(newThreshold);
      }
    }
    // Update period value for monitoring publishers
    if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
    {
@@ -941,7 +931,6 @@
  /**
   * Creates the backend associated to this replication server.
   * @throws ConfigException
   */
  private void createBackend() throws ConfigException
  {
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -75,12 +76,10 @@
  private final DN baseDN;
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
   * are late. Using an AtomicReference to avoid leaking references to costly
   * threads.
   * Periodically verifies whether the connected DSs are late and publishes any
   * pending status messages.
   */
  private AtomicReference<StatusAnalyzer> statusAnalyzer =
      new AtomicReference<StatusAnalyzer>();
  private final StatusAnalyzer statusAnalyzer;
  /**
   * The monitoring publisher that periodically sends monitoring messages to the
@@ -166,6 +165,98 @@
   */
  private int assuredTimeoutTimerPurgeCounter = 0;
  /**
   * Stores pending status messages such as DS change time heartbeats for future
   * forwarding to the rest of the topology. This class is required in order to
   * decouple inbound IO processing from outbound IO processing and avoid
   * potential inter-process deadlocks. In particular, the {@code ServerReader}
   * thread must not send messages.
   */
  private static class PendingStatusMessages
  {
    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
        new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private boolean sendRSTopologyMsg;
    private boolean sendDSTopologyMsg;
    private int excludedDSForTopologyMsg = -1;
    /**
     * Enqueues a TopologyMsg for all the connected directory servers in order
     * to let them know the topology (every known DSs and RSs).
     *
     * @param excludedDS
     *          If not null, the topology message will not be sent to this DS.
     */
    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
    {
      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
      if (sendDSTopologyMsg)
      {
        if (excludedServerId != excludedDSForTopologyMsg)
        {
          excludedDSForTopologyMsg = -1;
        }
      }
      else
      {
        sendDSTopologyMsg = true;
        excludedDSForTopologyMsg = excludedServerId;
      }
    }
    /**
     * Enqueues a TopologyMsg for all the connected replication servers in order
     * to let them know our connected LDAP servers.
     */
    private void enqueueTopoInfoToAllRSs()
    {
      sendRSTopologyMsg = true;
    }
    /**
     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
     * all other RS instances.
     *
     * @param msg
     *          The heartbeat message.
     */
    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
    {
      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
    }
    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
    {
      pendingDSMonitorMsgs.put(dsServerId, msg);
    }
    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
    {
      pendingRSMonitorMsgs.put(rsServerId, msg);
    }
  }
  private final Object pendingStatusMessagesLock = new Object();
  /** @GuardedBy("pendingStatusMessagesLock") */
  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
  /**
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
@@ -184,7 +275,8 @@
        + ") assured timer for domain \"" + baseDN + "\"", true);
    this.domainDB =
        localReplicationServer.getChangelogDB().getReplicationDomainDB();
    this.statusAnalyzer = new StatusAnalyzer(this);
    this.statusAnalyzer.start();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -704,7 +796,7 @@
   * @param ack The ack message received.
   * @param ackingServer The server handler of the server that sent the ack.
   */
  public void processAck(AckMsg ack, ServerHandler ackingServer)
  void processAck(AckMsg ack, ServerHandler ackingServer)
  {
    // Retrieve the expected acks info for the update matching the original
    // sent update.
@@ -990,21 +1082,12 @@
        if (connectedRSs.containsKey(sHandler.getServerId()))
        {
          unregisterServerHandler(sHandler, shutdown, false);
        } else if (connectedDSs.containsKey(sHandler.getServerId()))
        }
        else if (connectedDSs.containsKey(sHandler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (connectedDSs.size() == 1)
          {
            if (logger.isTraceEnabled())
            {
              debug("remote server " + sHandler
                  + " is the last DS to be stopped: stopping status analyzer");
            }
            stopStatusAnalyzer();
          }
          unregisterServerHandler(sHandler, shutdown, true);
        } else if (otherHandlers.contains(sHandler))
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
@@ -1038,15 +1121,19 @@
    resetGenerationIdIfPossible();
    if (!shutdown)
    {
      if (isDirectoryServer)
      synchronized (pendingStatusMessagesLock)
      {
        // Update the remote replication servers with our list
        // of connected LDAP servers
        sendTopoInfoToAllRSs();
        if (isDirectoryServer)
        {
          // Update the remote replication servers with our list
          // of connected LDAP servers
          pendingStatusMessages.enqueueTopoInfoToAllRSs();
        }
        // Warn our DSs that a RS or DS has quit (does not use this
        // handler as already removed from list)
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      // Warn our DSs that a RS or DS has quit (does not use this
      // handler as already removed from list)
      sendTopoInfoToAllDSsExcept(null);
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
@@ -1384,99 +1471,71 @@
    return servers;
  }
  /**
   * Processes a message coming from one server in the topology and potentially
   * forwards it to one or all other servers.
   *
   * @param msg
   *          The message received and to be processed.
   * @param msgEmitter
   *          The server handler of the server that emitted the message.
   * @param sender
   *          The server handler of the server that sent the message.
   */
  public void process(RoutableMsg msg, ServerHandler msgEmitter)
  void process(RoutableMsg msg, ServerHandler sender)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (!(msg instanceof InitializeRequestMsg) &&
        !(msg instanceof InitializeTargetMsg) &&
        !(msg instanceof InitializeRcvAckMsg) &&
        !(msg instanceof EntryMsg) &&
        !(msg instanceof DoneMsg) &&
        (msg.getDestination() == this.localReplicationServer.getServerId()))
    if (msg.getDestination() == localReplicationServer.getServerId())
    {
      // Handle routable messages targeted at this RS.
      if (msg instanceof ErrorMsg)
      {
        ErrorMsg errorMsg = (ErrorMsg) msg;
        logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
      } else if (msg instanceof MonitorRequestMsg)
      {
        replyWithTopologyMonitorMsg(msg, msgEmitter);
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        domainMonitor.receiveMonitorDataResponse(monitorMsg,
            msgEmitter.getServerId());
      } else
      {
        replyWithUnroutableMsgType(msgEmitter, msg);
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
    if (!servers.isEmpty())
    {
      forwardMsgToAllServers(msg, servers, msgEmitter);
      else
      {
        replyWithUnroutableMsgType(sender, msg);
      }
    }
    else
    {
      replyWithUnreachablePeerMsg(msgEmitter, msg);
      // Forward message not destined for this RS.
      List<ServerHandler> servers = getDestinationServers(msg, sender);
      if (!servers.isEmpty())
      {
        forwardMsgToAllServers(msg, servers, sender);
      }
      else
      {
        replyWithUnreachablePeerMsg(sender, msg);
      }
    }
  }
  private void replyWithTopologyMonitorMsg(RoutableMsg msg,
      ServerHandler msgEmitter)
  /**
   * Responds to a monitor request message.
   *
   * @param msg
   *          The monitor request message.
   * @param sender
   *          The DS/RS which sent the monitor request.
   */
  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (msgEmitter.isDataServer())
    {
      // Monitoring information requested by a DS
      try
      {
        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
            msg.getDestination(), msg.getSenderID(),
            domainMonitor.getMonitorData());
        msgEmitter.send(monitorMsg);
      }
      catch (IOException e)
      {
        // the connection was closed.
      }
    }
    else
    {
      // Monitoring information requested by a RS
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
    enqueueMonitorMsg(msg, sender);
  }
      if (monitorMsg != null)
      {
        try
        {
          msgEmitter.send(monitorMsg);
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getDestination());
        }
      }
    }
  /**
   * Responds to a monitor message.
   *
   * @param msg
   *          The monitor message
   * @param sender
   *          The DS/RS which sent the monitor.
   */
  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
  {
    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
  }
  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1532,7 +1591,7 @@
  }
  private void forwardMsgToAllServers(RoutableMsg msg,
      List<ServerHandler> servers, ServerHandler msgEmitter)
      List<ServerHandler> servers, ServerHandler sender)
  {
    for (ServerHandler targetHandler : servers)
    {
@@ -1557,13 +1616,13 @@
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
        try
        {
          msgEmitter.send(errMsg);
          sender.send(errMsg);
        } catch (IOException ioe1)
        {
          // an error happened on the sender session trying to recover
          // from an error on the receiver session.
          // We don't have much solution left beside closing the sessions.
          stopServer(msgEmitter, false);
          stopServer(sender, false);
          stopServer(targetHandler, false);
        }
      // TODO Handle error properly (sender timeout in addition)
@@ -1629,42 +1688,26 @@
   * @return The newly created and filled MonitorMsg. Null if the current thread
   *         was interrupted while attempting to get the domain lock.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    try
    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
    monitorMsg.setReplServerDbState(getLatestServerState());
    // Add the server state for each connected DS and RS.
    for (DataServerHandler dsHandler : this.connectedDSs.values())
    {
      // Lock domain as we need to go through connected servers list
      lock();
    }
    catch (InterruptedException e)
    {
      return null;
      monitorMsg.setServerState(dsHandler.getServerId(),
          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
          true);
    }
    try
    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
    {
      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      monitorMsg.setReplServerDbState(getLatestServerState());
      // Add the server state for each connected DS and RS.
      for (DataServerHandler dsHandler : this.connectedDSs.values())
      {
        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
      }
      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
      }
      return monitorMsg;
      monitorMsg.setServerState(rsHandler.getServerId(),
          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
          false);
    }
    finally
    {
      release();
    }
    return monitorMsg;
  }
  /**
@@ -1678,6 +1721,7 @@
    assuredTimeoutTimer.cancel();
    stopAllServers(true);
    statusAnalyzer.shutdown();
  }
  /**
@@ -1701,79 +1745,7 @@
    return "ReplicationServerDomain " + baseDN;
  }
  /**
   * Send a TopologyMsg to all the connected directory servers in order to let
   * them know the topology (every known DSs and RSs).
   *
   * @param notThisOne
   *          If not null, the topology message will not be sent to this DS.
   */
  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
  {
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      if (dsHandler != notThisOne)
      // All except the supplied one
      {
        for (int i=1; i<=2; i++)
        {
          if (!dsHandler.shuttingDown()
              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            TopologyMsg topoMsg =
                createTopologyMsgForDS(dsHandler.getServerId());
            try
            {
              dsHandler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
            {
              if (i == 2)
              {
                logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "directory",
                    dsHandler.getServerId(), e.getMessage());
              }
            }
          }
          sleep(100);
        }
      }
    }
  }
  /**
   * Send a TopologyMsg to all the connected replication servers
   * in order to let them know our connected LDAP servers.
   */
  private void sendTopoInfoToAllRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
        if (!rsHandler.shuttingDown()
            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        {
          try
          {
            rsHandler.sendTopoInfo(topoMsg);
            break;
          }
          catch (IOException e)
          {
            if (i == 2)
            {
              logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "replication",
                  rsHandler.getServerId(), e.getMessage());
            }
          }
        }
        sleep(100);
      }
    }
  }
  /**
   * Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2031,7 +2003,7 @@
        return;
      }
      sendTopoInfoToAllExcept(senderHandler);
      enqueueTopoInfoToAllExcept(senderHandler);
      logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS,
          senderHandler.getServerId(), baseDN.toNormalizedString(), newStatus);
@@ -2053,7 +2025,7 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatus(DataServerHandler dsHandler,
  private boolean changeStatus(DataServerHandler dsHandler,
      StatusMachineEvent event)
  {
    try
@@ -2106,7 +2078,7 @@
        return false;
      }
      sendTopoInfoToAllExcept(dsHandler);
      enqueueTopoInfoToAllExcept(dsHandler);
    }
    catch (Exception e)
    {
@@ -2125,7 +2097,7 @@
   */
  public void sendTopoInfoToAll()
  {
    sendTopoInfoToAllExcept(null);
    enqueueTopoInfoToAllExcept(null);
  }
  /**
@@ -2134,10 +2106,14 @@
   * @param dsHandler
   *          if not null, the topology message will not be sent to this DS
   */
  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
  {
    sendTopoInfoToAllDSsExcept(dsHandler);
    sendTopoInfoToAllRSs();
    synchronized (pendingStatusMessagesLock)
    {
      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
      pendingStatusMessages.enqueueTopoInfoToAllRSs();
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
  /**
@@ -2253,7 +2229,11 @@
       * Sends the currently known topology information to every connected
       * DS we have.
       */
      sendTopoInfoToAllDSsExcept(null);
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      statusAnalyzer.notifyPendingStatusMessage();
    }
    catch(Exception e)
    {
@@ -2373,36 +2353,6 @@
  }
  /**
   * Starts the status analyzer for the domain if not already started.
   */
  private void startStatusAnalyzer()
  {
    int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread = new StatusAnalyzer(this);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        thread.start();
      }
    }
  }
  /**
   * Stops the status analyzer for the domain.
   */
  private void stopStatusAnalyzer()
  {
    final StatusAnalyzer thread = statusAnalyzer.get();
    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
    {
      thread.shutdown();
      thread.waitForShutdown();
    }
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  private void startMonitoringPublisher()
@@ -2569,62 +2519,62 @@
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  {
    for (int i = 1; i <= 2; i++)
    {
      if (!handler.shuttingDown()
          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
      {
        try
        {
          handler.sendTopoInfo(msg);
          break;
        }
        catch (IOException e)
        {
          if (i == 2)
          {
            logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO,
                baseDN.toNormalizedString(), type, handler.getServerId(),
                e.getMessage());
          }
        }
      }
      sleep(100);
    }
  }
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg)
  {
    try
    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
    if (senderHandler.isDataServer())
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    }
    catch (InterruptedException ex)
    {
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
    {
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      if (senderHandler.isDataServer())
      /*
       * If we are the first replication server warned, then forward the message
       * to the remote replication servers.
       */
      synchronized (pendingStatusMessagesLock)
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : connectedRSs.values())
        {
          try
          {
            if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
            {
              rsHandler.send(msg);
            }
          }
          catch (IOException e)
          {
            logger.traceException(e);
            logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG,
                "Replication Server " + localReplicationServer.getReplicationPort() + " "
                    + baseDN + " " + localReplicationServer.getServerId());
            stopServer(rsHandler, false);
          }
        }
        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
      }
    }
    finally
    {
      release();
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
@@ -2660,26 +2610,6 @@
  }
  /**
   * Update the status analyzer with the new threshold value.
   *
   * @param degradedStatusThreshold
   *          The new threshold value.
   */
  void updateDegradedStatusThreshold(int degradedStatusThreshold)
  {
    if (degradedStatusThreshold == 0)
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
    }
    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
    }
  }
  /**
   * Update the monitoring publisher with the new period value.
   *
   * @param period
@@ -2715,7 +2645,6 @@
   */
  public void register(DataServerHandler dsHandler)
  {
    startStatusAnalyzer();
    startMonitoringPublisher();
    // connected with new DS: store handler.
@@ -2723,7 +2652,7 @@
    // Tell peer RSs and DSs a new DS just connected to us
    // No need to re-send TopologyMsg to this just new DS
    sendTopoInfoToAllExcept(dsHandler);
    enqueueTopoInfoToAllExcept(dsHandler);
  }
  /**
@@ -2765,7 +2694,7 @@
    // test
    if (degradedStatusThreshold > 0)
    {
      for (DataServerHandler serverHandler : getConnectedDSs().values())
      for (DataServerHandler serverHandler : connectedDSs.values())
      {
        // Get number of pending changes for this server
        final int nChanges = serverHandler.getRcvMsgQueueSize();
@@ -2801,4 +2730,158 @@
      }
    }
  }
  /**
   * Sends any enqueued status messages to the rest of the topology.
   */
  void sendPendingStatusMessages()
  {
    /*
     * Take a snapshot of pending status notifications in order to avoid holding
     * the broadcast lock for too long. In addition, clear the notifications so
     * that they are not resent the next time.
     */
    final PendingStatusMessages savedState;
    synchronized (pendingStatusMessagesLock)
    {
      savedState = pendingStatusMessages;
      pendingStatusMessages = new PendingStatusMessages();
    }
    sendPendingChangeTimeHeartbeatMsgs(savedState);
    sendPendingTopologyMsgs(savedState);
    sendPendingMonitorMsgs(savedState);
  }
  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
  {
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
        .entrySet())
    {
      ServerHandler ds = connectedDSs.get(msg.getKey());
      if (ds != null)
      {
        try
        {
          ds.send(msg.getValue());
        }
        catch (IOException e)
        {
          // Ignore: connection closed.
        }
      }
    }
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
        .entrySet())
    {
      ServerHandler rs = connectedRSs.get(msg.getKey());
      if (rs != null)
      {
        try
        {
          rs.send(msg.getValue());
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          // FIXME: why do we log for RSs but not DSs?
          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue()
              .getDestination());
        }
      }
    }
  }
  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
  {
    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
        .values())
    {
      for (ReplicationServerHandler rsHandler : connectedRSs.values())
      {
        try
        {
          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
          {
            rsHandler.send(pendingHeartbeat);
          }
        }
        catch (IOException e)
        {
          logger.traceException(e);
          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server "
              + localReplicationServer.getReplicationPort() + " " + baseDN
              + " " + localReplicationServer.getServerId());
          stopServer(rsHandler, false);
        }
      }
    }
  }
  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
  {
    if (pendingMsgs.sendDSTopologyMsg)
    {
      for (ServerHandler handler : connectedDSs.values())
      {
        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
        {
          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
              .getServerId());
          sendTopologyMsg("directory", handler, topoMsg);
        }
      }
    }
    if (pendingMsgs.sendRSTopologyMsg)
    {
      final TopologyMsg topoMsg = createTopologyMsgForRS();
      for (ServerHandler handler : connectedRSs.values())
      {
        sendTopologyMsg("replication", handler, topoMsg);
      }
    }
  }
  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (sender.isDataServer())
    {
      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID(),
          domainMonitor.getMonitorData());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    else
    {
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
}
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,9 @@
package org.opends.server.replication.server;
import java.io.IOException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
@@ -45,6 +47,7 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
/**
@@ -485,7 +488,7 @@
   */
  public ReplicationServerDomain getDomain()
  {
    return this.replicationServerDomain;
    return replicationServerDomain;
  }
  /**
@@ -848,21 +851,45 @@
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMsg msg)
  void process(RoutableMsg msg)
  {
    if (logger.isTraceEnabled())
    {
      logger.trace("In "
          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
          + this + " processes routable msg received:" + msg);
    }
    replicationServerDomain.process(msg, this);
  }
  /**
   * Responds to a monitor request message.
   *
   * @param msg
   *          The monitor request message.
   */
  void processMonitorRequestMsg(MonitorRequestMsg msg)
  {
    replicationServerDomain.processMonitorRequestMsg(msg, this);
  }
  /**
   * Responds to a monitor message.
   *
   * @param msg
   *          The monitor message.
   */
  void processMonitorMsg(MonitorMsg msg)
  {
    replicationServerDomain.processMonitorMsg(msg, this);
  }
  /**
   * Processes a change time heartbeat msg.
   *
   * @param msg The message to be processed.
   */
  public void process(ChangeTimeHeartbeatMsg msg)
  void process(ChangeTimeHeartbeatMsg msg)
  {
    if (logger.isTraceEnabled())
      logger.trace("In "
@@ -926,15 +953,6 @@
  }
  /**
   * Sets the replication server domain associated.
   * @param rsd The provided replication server domain.
   */
  protected void setReplicationServerDomain(ReplicationServerDomain rsd)
  {
    this.replicationServerDomain = rsd;
  }
  /**
   * Sets the window size when used when sending to the remote.
   * @param size The provided window size.
   */
@@ -1180,7 +1198,7 @@
   * Process a Ack message received.
   * @param ack the message received.
   */
  public void processAck(AckMsg ack)
  void processAck(AckMsg ack)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processAck(ack, this);
@@ -1201,7 +1219,7 @@
   * Process a ResetGenerationIdMsg message received.
   * @param msg the message received.
   */
  public void processResetGenId(ResetGenerationIdMsg msg)
  void processResetGenId(ResetGenerationIdMsg msg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.resetGenerationId(this, msg);
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerReader.java
@@ -174,8 +174,22 @@
          {
            handler.updateWindow((WindowMsg) msg);
          }
          else if (msg instanceof MonitorRequestMsg)
          {
            handler.processMonitorRequestMsg((MonitorRequestMsg) msg);
          }
          else if (msg instanceof MonitorMsg)
          {
            handler.processMonitorMsg((MonitorMsg) msg);
          }
          else if (msg instanceof RoutableMsg)
          {
            /*
             * Note that we handle monitor messages separately since they in
             * fact never need "routing" and are instead sent directly between
             * connected peers. Doing so allows us to more clearly decouple
             * write IO from the reader thread (see OPENDJ-1354).
             */
            handler.process((RoutableMsg) msg);
          }
          else if (msg instanceof ResetGenerationIdMsg)
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -42,8 +42,11 @@
 * the threshold is uncrossed, the status analyzer must make the DS status
 * change back to NORMAL_STATUS. To have meaning of status, please refer to
 * ServerStatus class.
 * <p>
 * In addition, this thread is responsible for publishing any pending status
 * messages.
 */
public class StatusAnalyzer extends DirectoryThread
class StatusAnalyzer extends DirectoryThread
{
  private static final LocalizedLogger logger = LocalizedLogger
      .getLoggerForThisClass();
@@ -52,9 +55,9 @@
  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private final ReplicationServerDomain replicationServerDomain;
  private final Object shutdownLock = new Object();
  private volatile boolean shutdown = false;
  private volatile boolean done = false;
  private final Object eventMonitor = new Object();
  private boolean pendingStatusMessage = false;
  private long nextCheckDSDegradedStatusTime;
@@ -64,13 +67,13 @@
   * @param replicationServerDomain
   *          The ReplicationServerDomain the status analyzer is for.
   */
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  {
    super("Replication server RS("
        + replicationServerDomain.getLocalRSServerId()
        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
        + ") status monitor for domain \""
        + replicationServerDomain.getBaseDN()
        + "\"");
    this.replicationServerDomain = replicationServerDomain;
  }
@@ -87,35 +90,50 @@
      logger.trace(getMessage("Directory server status analyzer starting."));
    }
    while (!shutdown)
    try
    {
      synchronized (shutdownLock)
      while (true)
      {
        if (!shutdown)
        final boolean requestStatusBroadcastWasRequested;
        synchronized (eventMonitor)
        {
          try
          if (!isShutdownInitiated() && !pendingStatusMessage)
          {
            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
          }
          catch (InterruptedException e)
          {
            // Server shutdown monitor may interrupt slow threads.
            logger.traceException(e);
            shutdown = true;
            break;
          }
          requestStatusBroadcastWasRequested = pendingStatusMessage;
          pendingStatusMessage = false;
        }
        if (isShutdownInitiated())
        {
          break;
        }
        // Broadcast heartbeats, topology messages, etc if requested.
        if (requestStatusBroadcastWasRequested)
        {
          replicationServerDomain.sendPendingStatusMessages();
        }
        /*
         * Check the degraded status for connected DS instances only if
         * sufficient time has passed. The current time is not cached because
         * the call to checkDSDegradedStatus may take some time.
         */
        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
        {
          replicationServerDomain.checkDSDegradedStatus();
          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
              + STATUS_ANALYZER_SLEEP_TIME;
        }
      }
      if (shutdown)
      {
        break;
      }
      replicationServerDomain.checkDSDegradedStatus();
    }
    catch (InterruptedException e)
    {
      // Forcefully stopped.
    }
    done = true;
    logger.trace(getMessage("Status analyzer is terminated."));
  }
@@ -133,46 +151,45 @@
  /**
   * Stops the thread.
   */
  public void shutdown()
  void shutdown()
  {
    synchronized (shutdownLock)
    initiateShutdown();
    if (logger.isTraceEnabled())
    {
      shutdown = true;
      shutdownLock.notifyAll();
      if (logger.isTraceEnabled())
      {
        logger.trace(getMessage("Shutting down status analyzer."));
      }
      logger.trace(getMessage("Shutting down status analyzer."));
    }
    synchronized (eventMonitor)
    {
      eventMonitor.notifyAll();
    }
    try
    {
      join(2000);
    }
    catch (InterruptedException e)
    {
      // Trapped: forcefully stop the thread.
    }
    if (isAlive())
    {
      // The join timed out or was interrupted so attempt to forcefully stop the
      // analyzer.
      interrupt();
    }
  }
  /**
   * Waits for analyzer death. If not finished within 2 seconds, forces
   * interruption
   * Requests that a topology state related message be broadcast to the rest of
   * the topology. Messages include DS heartbeats, topology information, etc.
   */
  public void waitForShutdown()
  void notifyPendingStatusMessage()
  {
    try
    synchronized (eventMonitor)
    {
      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
      int n = 0;
      while (!done && this.isAlive())
      {
        Thread.sleep(50);
        n++;
        if (n >= FACTOR)
        {
          logger.trace(getMessage("Interrupting status analyzer."));
          interrupt();
        }
      }
    }
    catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
      pendingStatusMessage = true;
      eventMonitor.notifyAll();
    }
  }
}