From b875ab3f7b327f797ec4532015e45da6ae3fff56 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 08 Apr 2014 09:09:25 +0000
Subject: [PATCH] Backport fix for OPENDJ-1354: replication threads BLOCKED in pendingChanges queue
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 11
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 67 ++-
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java | 3
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 237 ++++------
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java | 93 +++-
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java | 12
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 44 +
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 767 +++++++++++++++++++++---------------
opends/src/server/org/opends/server/replication/server/ServerReader.java | 17
9 files changed, 710 insertions(+), 541 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 9b55637..a25690c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -50,12 +50,24 @@
* When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
* MonitorMsg.
*/
-public class MonitorMsg extends RoutableMsg
+public class MonitorMsg extends ReplicationMsg
{
/**
- * Data structure to manage the state and the approximation
- * of the data of the first missing change for each LDAP server
- * connected to a Replication Server.
+ * The destination server or servers of this message.
+ */
+ private final int destination;
+
+ /**
+ * The serverID of the server that sends this message.
+ */
+ private final int senderID;
+
+
+
+ /**
+ * Data structure to manage the state and the approximation of the data of the
+ * first missing change for each LDAP server connected to a Replication
+ * Server.
*/
static class ServerData
{
@@ -89,24 +101,7 @@
*/
public MonitorMsg(int sender, int destination)
{
- super(sender, destination);
- }
-
- /**
- * Sets the sender ID.
- * @param senderID The sender ID.
- */
- public void setSenderID(int senderID)
- {
- this.senderID = senderID;
- }
-
- /**
- * Sets the destination.
- * @param destination The destination.
- */
- public void setDestination(int destination)
- {
+ this.senderID = sender;
this.destination = destination;
}
@@ -459,6 +454,32 @@
return data.rsStates.keySet().iterator();
}
+
+
+ /**
+ * Get the destination.
+ *
+ * @return the destination
+ */
+ public int getDestination()
+ {
+ return destination;
+ }
+
+
+
+ /**
+ * Get the server ID of the server that sent this message.
+ *
+ * @return the server id
+ */
+ public int getSenderID()
+ {
+ return senderID;
+ }
+
+
+
/**
* {@inheritDoc}
*/
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index baf8e26..cfe790c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -30,55 +30,70 @@
import java.util.zip.DataFormatException;
/**
- * This message is part of the replication protocol.
- * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
- * informations.
- * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
- * MonitorMessage.
+ * This message is part of the replication protocol. RS1 sends a
+ * MonitorRequestMsg to RS2 to request its monitoring information. When RS2
+ * receives a MonitorRequestMsg from RS1, RS2 responds with a MonitorMessage.
*/
-public class MonitorRequestMsg extends RoutableMsg
+public class MonitorRequestMsg extends ReplicationMsg
{
/**
+ * The destination server or servers of this message.
+ */
+ private final int destination;
+
+ /**
+ * The serverID of the server that sends this message.
+ */
+ private final int senderID;
+
+
+
+ /**
* Creates a message.
*
- * @param serverID The sender server of this message.
- * @param destination The server or servers targeted by this message.
+ * @param serverID
+ * The sender server of this message.
+ * @param destination
+ * The server or servers targeted by this message.
*/
public MonitorRequestMsg(int serverID, int destination)
{
- super(serverID, destination);
+ this.senderID = serverID;
+ this.destination = destination;
}
/**
* Creates a new message by decoding the provided byte array.
- * @param in A byte array containing the encoded information for the message,
- * @throws DataFormatException If the in does not contain a properly,
- * encoded message.
+ *
+ * @param in
+ * A byte array containing the encoded information for the message,
+ * @throws DataFormatException
+ * If the in does not contain a properly, encoded message.
*/
public MonitorRequestMsg(byte[] in) throws DataFormatException
{
- super();
try
{
// First byte is the type
if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
+ throw new DataFormatException("input is not a valid "
+ + this.getClass().getCanonicalName());
int pos = 1;
// sender
int length = getNextLength(in, pos);
String senderString = new String(in, pos, length, "UTF-8");
this.senderID = Integer.valueOf(senderString);
- pos += length +1;
+ pos += length + 1;
// destination
length = getNextLength(in, pos);
String destinationString = new String(in, pos, length, "UTF-8");
this.destination = Integer.valueOf(destinationString);
- pos += length +1;
+ pos += length + 1;
- } catch (UnsupportedEncodingException e)
+ }
+ catch (UnsupportedEncodingException e)
{
throw new DataFormatException("UTF-8 is not supported by this jvm.");
}
@@ -95,8 +110,7 @@
byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
- int length = 1 + senderBytes.length + 1
- + destinationBytes.length + 1;
+ int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
byte[] resultByteArray = new byte[length];
@@ -117,4 +131,41 @@
return null;
}
}
+
+
+
+ /**
+ * Get the destination.
+ *
+ * @return the destination
+ */
+ public int getDestination()
+ {
+ return destination;
+ }
+
+
+
+ /**
+ * Get the server ID of the server that sent this message.
+ *
+ * @return the server id
+ */
+ public int getSenderID()
+ {
+ return senderID;
+ }
+
+
+
+ /**
+ * Returns a string representation of the message.
+ *
+ * @return the string representation of this message.
+ */
+ public String toString()
+ {
+ return "[" + getClass().getCanonicalName() + " sender=" + senderID
+ + " destination=" + destination + "]";
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
index 345a00c..74608e1 100644
--- a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -22,13 +22,19 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
/**
- * This is an abstract class of messages of the replication protocol
- * for message that needs to contain information about the server that
- * send them and the destination servers to which they should be sent.
+ * This is an abstract class of messages of the replication protocol for message
+ * that needs to contain information about the server that send them and the
+ * destination servers to which they should be sent.
+ * <p>
+ * Routable messages are used when initializing a new replica from an existing
+ * replica: the total update messages are sent across the topology from the
+ * source replica to the target replica, possibly traversing one or two
+ * replication servers in the process (e.g. DS1 -> RS1 -> RS2 -> DS2).
*/
public abstract class RoutableMsg extends ReplicationMsg
{
diff --git a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index 111d635..1a2b514 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -104,7 +104,6 @@
{
break;
}
- monitorMsg.setDestination(serverHandler.getServerId());
try
{
serverHandler.send(monitorMsg);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index bf00104..69a16d8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -829,16 +829,6 @@
}
}
- // Update threshold value for status analyzers
- final int newThreshold = config.getDegradedStatusThreshold();
- if (oldConfig.getDegradedStatusThreshold() != newThreshold)
- {
- for (ReplicationServerDomain domain : getReplicationServerDomains())
- {
- domain.updateDegradedStatusThreshold(newThreshold);
- }
- }
-
// Update period value for monitoring publishers
if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
{
@@ -970,7 +960,6 @@
/**
* Creates the backend associated to this replication server.
- * @throws ConfigException
*/
private void createBackend() throws ConfigException
{
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9121ec8..67ffa6c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,13 +22,14 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -53,6 +54,8 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -76,12 +79,10 @@
private final DN baseDN;
/**
- * The Status analyzer that periodically verifies whether the connected DSs
- * are late. Using an AtomicReference to avoid leaking references to costly
- * threads.
+ * Periodically verifies whether the connected DSs are late and publishes any
+ * pending status messages.
*/
- private AtomicReference<StatusAnalyzer> statusAnalyzer =
- new AtomicReference<StatusAnalyzer>();
+ private final StatusAnalyzer statusAnalyzer;
/**
* The monitoring publisher that periodically sends monitoring messages to the
@@ -168,6 +169,96 @@
private int assuredTimeoutTimerPurgeCounter = 0;
/**
+ * Stores pending status messages such as DS change time heartbeats for future
+ * forwarding to the rest of the topology. This class is required in order to
+ * decouple inbound IO processing from outbound IO processing and avoid
+ * potential inter-process deadlocks. In particular, the {@code ServerReader}
+ * thread must not send messages.
+ */
+ private static class PendingStatusMessages
+ {
+ private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
+ new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
+ private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
+ new HashMap<Integer, MonitorMsg>(1);
+ private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
+ new HashMap<Integer, MonitorMsg>(1);
+ private boolean sendRSTopologyMsg;
+ private boolean sendDSTopologyMsg;
+ private int excludedDSForTopologyMsg = -1;
+
+
+
+ /**
+ * Enqueues a TopologyMsg for all the connected directory servers in order
+ * to let them know the topology (every known DSs and RSs).
+ *
+ * @param excludedDS
+ * If not null, the topology message will not be sent to this DS.
+ */
+ private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
+ {
+ int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
+ if (sendDSTopologyMsg)
+ {
+ if (excludedServerId != excludedDSForTopologyMsg)
+ {
+ excludedDSForTopologyMsg = -1;
+ }
+ }
+ else
+ {
+ sendDSTopologyMsg = true;
+ excludedDSForTopologyMsg = excludedServerId;
+ }
+ }
+
+
+
+ /**
+ * Enqueues a TopologyMsg for all the connected replication servers in order
+ * to let them know our connected LDAP servers.
+ */
+ private void enqueueTopoInfoToAllRSs()
+ {
+ sendRSTopologyMsg = true;
+ }
+
+
+
+ /**
+ * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
+ * all other RS instances.
+ *
+ * @param msg
+ * The heartbeat message.
+ */
+ private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
+ {
+ pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
+ }
+
+
+
+ private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
+ {
+ pendingDSMonitorMsgs.put(dsServerId, msg);
+ }
+
+
+
+ private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
+ {
+ pendingRSMonitorMsgs.put(rsServerId, msg);
+ }
+ }
+
+ private final Object pendingStatusMessagesLock = new Object();
+
+ /** @GuardedBy("pendingStatusMessagesLock") */
+ private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
+
+ /**
* Creates a new ReplicationServerDomain associated to the baseDN.
*
* @param baseDN
@@ -185,7 +276,8 @@
+ ") assured timer for domain \"" + baseDN + "\"", true);
this.domainDB =
localReplicationServer.getChangelogDB().getReplicationDomainDB();
-
+ this.statusAnalyzer = new StatusAnalyzer(this);
+ this.statusAnalyzer.start();
DirectoryServer.registerMonitorProvider(this);
}
@@ -712,7 +804,7 @@
* @param ack The ack message received.
* @param ackingServer The server handler of the server that sent the ack.
*/
- public void processAck(AckMsg ack, ServerHandler ackingServer)
+ void processAck(AckMsg ack, ServerHandler ackingServer)
{
// Retrieve the expected acks info for the update matching the original
// sent update.
@@ -1003,21 +1095,12 @@
if (connectedRSs.containsKey(sHandler.getServerId()))
{
unregisterServerHandler(sHandler, shutdown, false);
- } else if (connectedDSs.containsKey(sHandler.getServerId()))
+ }
+ else if (connectedDSs.containsKey(sHandler.getServerId()))
{
- // If this is the last DS for the domain,
- // shutdown the status analyzer
- if (connectedDSs.size() == 1)
- {
- if (debugEnabled())
- {
- debug("remote server " + sHandler
- + " is the last DS to be stopped: stopping status analyzer");
- }
- stopStatusAnalyzer();
- }
unregisterServerHandler(sHandler, shutdown, true);
- } else if (otherHandlers.contains(sHandler))
+ }
+ else if (otherHandlers.contains(sHandler))
{
unregisterOtherHandler(sHandler);
}
@@ -1052,15 +1135,19 @@
resetGenerationIdIfPossible();
if (!shutdown)
{
- if (isDirectoryServer)
+ synchronized (pendingStatusMessagesLock)
{
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendTopoInfoToAllRSs();
+ if (isDirectoryServer)
+ {
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ pendingStatusMessages.enqueueTopoInfoToAllRSs();
+ }
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
}
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- sendTopoInfoToAllDSsExcept(null);
+ statusAnalyzer.notifyPendingStatusMessage();
}
}
@@ -1405,94 +1492,64 @@
*
* @param msg
* The message received and to be processed.
- * @param msgEmitter
- * The server handler of the server that emitted the message.
+ * @param sender
+ * The server handler of the server that sent the message.
*/
- public void process(RoutableMsg msg, ServerHandler msgEmitter)
+ void process(RoutableMsg msg, ServerHandler sender)
{
- // Test the message for which a ReplicationServer is expected
- // to be the destination
- if (!(msg instanceof InitializeRequestMsg) &&
- !(msg instanceof InitializeTargetMsg) &&
- !(msg instanceof InitializeRcvAckMsg) &&
- !(msg instanceof EntryMsg) &&
- !(msg instanceof DoneMsg) &&
- (msg.getDestination() == this.localReplicationServer.getServerId()))
+ if (msg.getDestination() == localReplicationServer.getServerId())
{
+ // Handle routable messages targeted at this RS.
if (msg instanceof ErrorMsg)
{
- ErrorMsg errorMsg = (ErrorMsg) msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
- } else if (msg instanceof MonitorRequestMsg)
- {
- replyWithTopologyMonitorMsg(msg, msgEmitter);
- } else if (msg instanceof MonitorMsg)
- {
- MonitorMsg monitorMsg = (MonitorMsg) msg;
- domainMonitor.receiveMonitorDataResponse(monitorMsg,
- msgEmitter.getServerId());
- } else
- {
- replyWithUnroutableMsgType(msgEmitter, msg);
+ logError(ERR_ERROR_MSG_RECEIVED.get(((ErrorMsg) msg).getDetails()));
}
- return;
- }
-
- List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
- if (!servers.isEmpty())
- {
- forwardMsgToAllServers(msg, servers, msgEmitter);
+ else
+ {
+ replyWithUnroutableMsgType(sender, msg);
+ }
}
else
{
- replyWithUnreachablePeerMsg(msgEmitter, msg);
+ // Forward message not destined for this RS.
+ List<ServerHandler> servers = getDestinationServers(msg, sender);
+ if (!servers.isEmpty())
+ {
+ forwardMsgToAllServers(msg, servers, sender);
+ }
+ else
+ {
+ replyWithUnreachablePeerMsg(sender, msg);
+ }
}
}
- private void replyWithTopologyMonitorMsg(RoutableMsg msg,
- ServerHandler msgEmitter)
- {
- /*
- * If the request comes from a Directory Server we need to build the full
- * list of all servers in the topology and send back a MonitorMsg with the
- * full list of all the servers in the topology.
- */
- if (msgEmitter.isDataServer())
- {
- // Monitoring information requested by a DS
- try
- {
- MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID(),
- domainMonitor.getMonitorData());
- msgEmitter.send(monitorMsg);
- }
- catch (IOException e)
- {
- // the connection was closed.
- }
- }
- else
- {
- // Monitoring information requested by a RS
- MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID());
- if (monitorMsg != null)
- {
- try
- {
- msgEmitter.send(monitorMsg);
- }
- catch (IOException e)
- {
- // We log the error. The requestor will detect a timeout or
- // any other failure on the connection.
- logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
- .getDestination())));
- }
- }
- }
+
+ /**
+ * Responds to a monitor request message.
+ *
+ * @param msg
+ * The monitor request message.
+ * @param sender
+ * The DS/RS which sent the monitor request.
+ */
+ void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
+ {
+ enqueueMonitorMsg(msg, sender);
+ }
+
+ /**
+ * Responds to a monitor message.
+ *
+ * @param msg
+ * The monitor message
+ * @param sender
+ * The DS/RS which sent the monitor.
+ */
+ void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
+ {
+ domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
}
private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1553,7 +1610,7 @@
}
private void forwardMsgToAllServers(RoutableMsg msg,
- List<ServerHandler> servers, ServerHandler msgEmitter)
+ List<ServerHandler> servers, ServerHandler sender)
{
for (ServerHandler targetHandler : servers)
{
@@ -1579,13 +1636,13 @@
ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
try
{
- msgEmitter.send(errMsg);
+ sender.send(errMsg);
} catch (IOException ioe1)
{
// an error happened on the sender session trying to recover
// from an error on the receiver session.
// We don't have much solution left beside closing the sessions.
- stopServer(msgEmitter, false);
+ stopServer(sender, false);
stopServer(targetHandler, false);
}
// TODO Handle error properly (sender timeout in addition)
@@ -1651,42 +1708,26 @@
* @return The newly created and filled MonitorMsg. Null if the current thread
* was interrupted while attempting to get the domain lock.
*/
- public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+ private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
{
- try
+ final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+ monitorMsg.setReplServerDbState(getLatestServerState());
+
+ // Add the server state for each connected DS and RS.
+ for (DataServerHandler dsHandler : this.connectedDSs.values())
{
- // Lock domain as we need to go through connected servers list
- lock();
- }
- catch (InterruptedException e)
- {
- return null;
+ monitorMsg.setServerState(dsHandler.getServerId(),
+ dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
+ true);
}
- try
+ for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
{
- final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
- monitorMsg.setReplServerDbState(getLatestServerState());
-
- // Add the server state for each connected DS and RS.
- for (DataServerHandler dsHandler : this.connectedDSs.values())
- {
- monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
- .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
- }
-
- for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
- {
- monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
- .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
- }
-
- return monitorMsg;
+ monitorMsg.setServerState(rsHandler.getServerId(),
+ rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
+ false);
}
- finally
- {
- release();
- }
+ return monitorMsg;
}
/**
@@ -1700,6 +1741,7 @@
assuredTimeoutTimer.cancel();
stopAllServers(true);
+ statusAnalyzer.shutdown();
}
/**
@@ -1723,83 +1765,7 @@
return "ReplicationServerDomain " + baseDN;
}
- /**
- * Send a TopologyMsg to all the connected directory servers in order to let
- * them know the topology (every known DSs and RSs).
- *
- * @param notThisOne
- * If not null, the topology message will not be sent to this DS.
- */
- private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
- {
- for (DataServerHandler dsHandler : connectedDSs.values())
- {
- if (dsHandler != notThisOne)
- // All except the supplied one
- {
- for (int i=1; i<=2; i++)
- {
- if (!dsHandler.shuttingDown()
- && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
- {
- TopologyMsg topoMsg =
- createTopologyMsgForDS(dsHandler.getServerId());
- try
- {
- dsHandler.sendTopoInfo(topoMsg);
- break;
- }
- catch (IOException e)
- {
- if (i == 2)
- {
- Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
- baseDN.toNormalizedString(), "directory",
- Integer.toString(dsHandler.getServerId()), e.getMessage());
- logError(message);
- }
- }
- }
- sleep(100);
- }
- }
- }
- }
- /**
- * Send a TopologyMsg to all the connected replication servers
- * in order to let them know our connected LDAP servers.
- */
- private void sendTopoInfoToAllRSs()
- {
- TopologyMsg topoMsg = createTopologyMsgForRS();
- for (ReplicationServerHandler rsHandler : connectedRSs.values())
- {
- for (int i=1; i<=2; i++)
- {
- if (!rsHandler.shuttingDown()
- && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
- {
- try
- {
- rsHandler.sendTopoInfo(topoMsg);
- break;
- }
- catch (IOException e)
- {
- if (i == 2)
- {
- Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
- baseDN.toNormalizedString(), "replication",
- Integer.toString(rsHandler.getServerId()), e.getMessage());
- logError(message);
- }
- }
- }
- sleep(100);
- }
- }
- }
/**
* Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2062,7 +2028,7 @@
return;
}
- sendTopoInfoToAllExcept(senderHandler);
+ enqueueTopoInfoToAllExcept(senderHandler);
Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
senderHandler.getServerId(), baseDN.toNormalizedString(),
@@ -2087,7 +2053,7 @@
* @param event The event to be used for new status computation
* @return True if we have been interrupted (must stop), false otherwise
*/
- public boolean changeStatus(DataServerHandler dsHandler,
+ private boolean changeStatus(DataServerHandler dsHandler,
StatusMachineEvent event)
{
try
@@ -2142,7 +2108,7 @@
return false;
}
- sendTopoInfoToAllExcept(dsHandler);
+ enqueueTopoInfoToAllExcept(dsHandler);
}
catch (Exception e)
{
@@ -2162,7 +2128,7 @@
*/
public void sendTopoInfoToAll()
{
- sendTopoInfoToAllExcept(null);
+ enqueueTopoInfoToAllExcept(null);
}
/**
@@ -2171,10 +2137,14 @@
* @param dsHandler
* if not null, the topology message will not be sent to this DS
*/
- private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
+ private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
{
- sendTopoInfoToAllDSsExcept(dsHandler);
- sendTopoInfoToAllRSs();
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
+ pendingStatusMessages.enqueueTopoInfoToAllRSs();
+ }
+ statusAnalyzer.notifyPendingStatusMessage();
}
/**
@@ -2293,7 +2263,11 @@
* Sends the currently known topology information to every connected
* DS we have.
*/
- sendTopoInfoToAllDSsExcept(null);
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
+ }
+ statusAnalyzer.notifyPendingStatusMessage();
}
catch(Exception e)
{
@@ -2414,37 +2388,6 @@
}
/**
- * Starts the status analyzer for the domain if not already started.
- */
- private void startStatusAnalyzer()
- {
- int degradedStatusThreshold =
- localReplicationServer.getDegradedStatusThreshold();
- if (degradedStatusThreshold > 0) // 0 means no status analyzer
- {
- final StatusAnalyzer thread =
- new StatusAnalyzer(this, degradedStatusThreshold);
- if (statusAnalyzer.compareAndSet(null, thread))
- {
- thread.start();
- }
- }
- }
-
- /**
- * Stops the status analyzer for the domain.
- */
- private void stopStatusAnalyzer()
- {
- final StatusAnalyzer thread = statusAnalyzer.get();
- if (thread != null && statusAnalyzer.compareAndSet(thread, null))
- {
- thread.shutdown();
- thread.waitForShutdown();
- }
- }
-
- /**
* Starts the monitoring publisher for the domain if not already started.
*/
private void startMonitoringPublisher()
@@ -2611,63 +2554,62 @@
}
+
+ private void sendTopologyMsg(String type, ServerHandler handler,
+ TopologyMsg msg)
+ {
+ for (int i = 1; i <= 2; i++)
+ {
+ if (!handler.shuttingDown()
+ && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
+ {
+ try
+ {
+ handler.sendTopoInfo(msg);
+ break;
+ }
+ catch (IOException e)
+ {
+ if (i == 2)
+ {
+ logError(ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+ baseDN.toNormalizedString(), type,
+ String.valueOf(handler.getServerId()), e.getMessage()));
+ }
+ }
+ }
+ sleep(100);
+ }
+ }
+
+
+
/**
* Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
* value received, and forwarding the message to the other RSes.
* @param senderHandler The handler for the server that sent the heartbeat.
* @param msg The message to process.
*/
- public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
+ void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
ChangeTimeHeartbeatMsg msg)
{
- try
+ domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+ if (senderHandler.isDataServer())
{
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- }
- catch (InterruptedException ex)
- {
- // We can't deal with this here, so re-interrupt thread so that it is
- // caught during subsequent IO.
- Thread.currentThread().interrupt();
- return;
- }
-
- try
- {
- domainDB.replicaHeartbeat(baseDN, msg.getCSN());
- if (senderHandler.isDataServer())
+ /*
+ * If we are the first replication server warned, then forward the message
+ * to the remote replication servers.
+ */
+ synchronized (pendingStatusMessagesLock)
{
- // If we are the first replication server warned,
- // then forwards the message to the remote replication servers
- for (ReplicationServerHandler rsHandler : connectedRSs.values())
- {
- try
- {
- if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
- {
- rsHandler.send(msg);
- }
- }
- catch (IOException e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- logError(ERR_CHANGELOG_ERROR_SENDING_MSG
- .get("Replication Server "
- + localReplicationServer.getReplicationPort() + " "
- + baseDN + " " + localReplicationServer.getServerId()));
- stopServer(rsHandler, false);
- }
- }
+ pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
}
- }
- finally
- {
- release();
+ statusAnalyzer.notifyPendingStatusMessage();
}
}
+
+
/**
* Get the latest (more recent) trim date of the changelog dbs associated
* to this domain.
@@ -2703,33 +2645,6 @@
}
/**
- * Update the status analyzer with the new threshold value.
- *
- * @param degradedStatusThreshold
- * The new threshold value.
- */
- void updateDegradedStatusThreshold(int degradedStatusThreshold)
- {
- if (degradedStatusThreshold == 0)
- {
- // Requested to stop analyzers
- stopStatusAnalyzer();
- return;
- }
-
- final StatusAnalyzer saThread = statusAnalyzer.get();
- if (saThread != null) // it is running
- {
- saThread.setDegradedStatusThreshold(degradedStatusThreshold);
- }
- else if (connectedDSs.size() > 0)
- {
- // Requested to start analyzers with provided threshold value
- startStatusAnalyzer();
- }
- }
-
- /**
* Update the monitoring publisher with the new period value.
*
* @param period
@@ -2765,7 +2680,6 @@
*/
public void register(DataServerHandler dsHandler)
{
- startStatusAnalyzer();
startMonitoringPublisher();
// connected with new DS: store handler.
@@ -2773,7 +2687,7 @@
// Tell peer RSs and DSs a new DS just connected to us
// No need to re-send TopologyMsg to this just new DS
- sendTopoInfoToAllExcept(dsHandler);
+ enqueueTopoInfoToAllExcept(dsHandler);
}
/**
@@ -2798,4 +2712,211 @@
+ " and port=" + localReplicationServer.getReplicationPort()
+ ": " + message);
}
+
+
+
+ /**
+ * Go through each connected DS, get the number of pending changes we have for
+ * it and change status accordingly if threshold value is crossed/uncrossed.
+ */
+ void checkDSDegradedStatus()
+ {
+ final int degradedStatusThreshold = localReplicationServer
+ .getDegradedStatusThreshold();
+ // Threshold value = 0 means no status analyzer (no degrading system)
+ // we should not have that as the status analyzer thread should not be
+ // created if this is the case, but for sanity purpose, we add this
+ // test
+ if (degradedStatusThreshold > 0)
+ {
+ for (DataServerHandler serverHandler : connectedDSs.values())
+ {
+ // Get number of pending changes for this server
+ final int nChanges = serverHandler.getRcvMsgQueueSize();
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In RS " + getLocalRSServerId() + ", for baseDN="
+ + getBaseDN() + ": " + "Status analyzer: DS "
+ + serverHandler.getServerId() + " has " + nChanges
+ + " message(s) in writer queue.");
+ }
+
+ // Check status to know if it is relevant to change the status. Do not
+ // take RSD lock to test. If we attempt to change the status whereas
+ // the current status does allow it, this will be noticed by
+ // the changeStatusFromStatusAnalyzer() method. This allows to take the
+ // lock roughly only when needed versus every sleep time timeout.
+ if (nChanges >= degradedStatusThreshold)
+ {
+ if (serverHandler.getStatus() == NORMAL_STATUS
+ && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
+ {
+ break; // Interrupted.
+ }
+ }
+ else
+ {
+ if (serverHandler.getStatus() == DEGRADED_STATUS
+ && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
+ {
+ break; // Interrupted.
+ }
+ }
+ }
+ }
+ }
+
+
+
+ /**
+ * Sends any enqueued status messages to the rest of the topology.
+ */
+ void sendPendingStatusMessages()
+ {
+ /*
+ * Take a snapshot of pending status notifications in order to avoid holding
+ * the broadcast lock for too long. In addition, clear the notifications so
+ * that they are not resent the next time.
+ */
+ final PendingStatusMessages savedState;
+ synchronized (pendingStatusMessagesLock)
+ {
+ savedState = pendingStatusMessages;
+ pendingStatusMessages = new PendingStatusMessages();
+ }
+ sendPendingChangeTimeHeartbeatMsgs(savedState);
+ sendPendingTopologyMsgs(savedState);
+ sendPendingMonitorMsgs(savedState);
+ }
+
+
+
+ private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
+ {
+ for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
+ .entrySet())
+ {
+ ServerHandler ds = connectedDSs.get(msg.getKey());
+ if (ds != null)
+ {
+ try
+ {
+ ds.send(msg.getValue());
+ }
+ catch (IOException e)
+ {
+ // Ignore: connection closed.
+ }
+ }
+ }
+ for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
+ .entrySet())
+ {
+ ServerHandler rs = connectedRSs.get(msg.getKey());
+ if (rs != null)
+ {
+ try
+ {
+ rs.send(msg.getValue());
+ }
+ catch (IOException e)
+ {
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+
+ // FIXME: why do we log for RSs but not DSs?
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(String.valueOf(msg
+ .getValue().getDestination())));
+ }
+ }
+ }
+ }
+
+
+
+ private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
+ {
+ for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
+ .values())
+ {
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
+ {
+ try
+ {
+ if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
+ {
+ rsHandler.send(pendingHeartbeat);
+ }
+ }
+ catch (IOException e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get("Replication Server "
+ + localReplicationServer.getReplicationPort() + " " + baseDN
+ + " " + localReplicationServer.getServerId()));
+ stopServer(rsHandler, false);
+ }
+ }
+ }
+ }
+
+
+
+ private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
+ {
+ if (pendingMsgs.sendDSTopologyMsg)
+ {
+ for (ServerHandler handler : connectedDSs.values())
+ {
+ if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
+ {
+ final TopologyMsg topoMsg = createTopologyMsgForDS(handler
+ .getServerId());
+ sendTopologyMsg("directory", handler, topoMsg);
+ }
+ }
+ }
+
+ if (pendingMsgs.sendRSTopologyMsg)
+ {
+ final TopologyMsg topoMsg = createTopologyMsgForRS();
+ for (ServerHandler handler : connectedRSs.values())
+ {
+ sendTopologyMsg("replication", handler, topoMsg);
+ }
+ }
+ }
+
+
+
+ private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
+ {
+ /*
+ * If the request comes from a Directory Server we need to build the full
+ * list of all servers in the topology and send back a MonitorMsg with the
+ * full list of all the servers in the topology.
+ */
+ if (sender.isDataServer())
+ {
+ MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID(),
+ domainMonitor.getMonitorData());
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
+ monitorMsg);
+ }
+ }
+ else
+ {
+ MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID());
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
+ monitorMsg);
+ }
+ }
+ statusAnalyzer.notifyPendingStatusMessage();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c7efe83..9c165a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server;
import java.io.IOException;
+
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
@@ -484,7 +485,7 @@
*/
public ReplicationServerDomain getDomain()
{
- return this.replicationServerDomain;
+ return replicationServerDomain;
}
/**
@@ -847,21 +848,45 @@
*
* @param msg The message to be processed.
*/
- public void process(RoutableMsg msg)
+ void process(RoutableMsg msg)
{
if (debugEnabled())
+ {
TRACER.debugInfo("In "
+ replicationServerDomain.getLocalRSMonitorInstanceName() + " "
+ this + " processes routable msg received:" + msg);
+ }
replicationServerDomain.process(msg, this);
}
/**
+ * Responds to a monitor request message.
+ *
+ * @param msg
+ * The monitor request message.
+ */
+ void processMonitorRequestMsg(MonitorRequestMsg msg)
+ {
+ replicationServerDomain.processMonitorRequestMsg(msg, this);
+ }
+
+ /**
+ * Responds to a monitor message.
+ *
+ * @param msg
+ * The monitor message.
+ */
+ void processMonitorMsg(MonitorMsg msg)
+ {
+ replicationServerDomain.processMonitorMsg(msg, this);
+ }
+
+ /**
* Processes a change time heartbeat msg.
*
* @param msg The message to be processed.
*/
- public void process(ChangeTimeHeartbeatMsg msg)
+ void process(ChangeTimeHeartbeatMsg msg)
{
if (debugEnabled())
TRACER.debugInfo("In "
@@ -925,15 +950,6 @@
}
/**
- * Sets the replication server domain associated.
- * @param rsd The provided replication server domain.
- */
- protected void setReplicationServerDomain(ReplicationServerDomain rsd)
- {
- this.replicationServerDomain = rsd;
- }
-
- /**
* Sets the window size when used when sending to the remote.
* @param size The provided window size.
*/
@@ -1179,7 +1195,7 @@
* Process a Ack message received.
* @param ack the message received.
*/
- public void processAck(AckMsg ack)
+ void processAck(AckMsg ack)
{
if (replicationServerDomain!=null)
replicationServerDomain.processAck(ack, this);
@@ -1200,7 +1216,7 @@
* Process a ResetGenerationIdMsg message received.
* @param msg the message received.
*/
- public void processResetGenId(ResetGenerationIdMsg msg)
+ void processResetGenId(ResetGenerationIdMsg msg)
{
if (replicationServerDomain!=null)
replicationServerDomain.resetGenerationId(this, msg);
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index ed74a43..6e85261 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -180,8 +180,23 @@
} else if (msg instanceof WindowMsg)
{
handler.updateWindow((WindowMsg) msg);
- } else if (msg instanceof RoutableMsg)
+ }
+ else if (msg instanceof MonitorRequestMsg)
{
+ handler.processMonitorRequestMsg((MonitorRequestMsg) msg);
+ }
+ else if (msg instanceof MonitorMsg)
+ {
+ handler.processMonitorMsg((MonitorMsg) msg);
+ }
+ else if (msg instanceof RoutableMsg)
+ {
+ /*
+ * Note that we handle monitor messages separately since they in
+ * fact never need "routing" and are instead sent directly between
+ * connected peers. Doing so allows us to more clearly decouple
+ * write IO from the reader thread (see OPENDJ-1354).
+ */
handler.process((RoutableMsg) msg);
} else if (msg instanceof ResetGenerationIdMsg)
{
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 34185c6..721ec23 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -22,18 +22,19 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
+
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.types.DebugLogLevel;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
/**
* This thread is in charge of periodically determining if the connected
@@ -44,46 +45,45 @@
* the threshold is uncrossed, the status analyzer must make the DS status
* change back to NORMAL_STATUS. To have meaning of status, please refer to
* ServerStatus class.
+ * <p>
+ * In addition, this thread is responsible for publishing any pending status
+ * messages.
*/
-public class StatusAnalyzer extends DirectoryThread
+class StatusAnalyzer extends DirectoryThread
{
-
- private volatile boolean shutdown = false;
-
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private final ReplicationServerDomain replicationServerDomain;
- private volatile int degradedStatusThreshold = -1;
-
/** Sleep time for the thread, in ms. */
private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
- private volatile boolean done = false;
+ private final ReplicationServerDomain replicationServerDomain;
+ private final Object eventMonitor = new Object();
+ private boolean pendingStatusMessage = false;
+ private long nextCheckDSDegradedStatusTime;
- private final Object shutdownLock = new Object();
+
/**
* Create a StatusAnalyzer.
- * @param replicationServerDomain The ReplicationServerDomain the status
- * analyzer is for.
- * @param degradedStatusThreshold The pending changes threshold value to be
- * used for putting a DS in DEGRADED_STATUS.
+ *
+ * @param replicationServerDomain
+ * The ReplicationServerDomain the status analyzer is for.
*/
- public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
- int degradedStatusThreshold)
+ StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
{
super("Replication server RS("
+ replicationServerDomain.getLocalRSServerId()
- + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
+ + ") status monitor for domain \""
+ + replicationServerDomain.getBaseDN()
+ "\"");
-
this.replicationServerDomain = replicationServerDomain;
- this.degradedStatusThreshold = degradedStatusThreshold;
}
+
+
/**
* Analyzes if servers are late or not, and change their status accordingly.
*/
@@ -96,79 +96,55 @@
getMessage("Directory server status analyzer starting."));
}
- while (!shutdown)
+ try
{
- synchronized (shutdownLock)
+ while (true)
{
- if (!shutdown)
+ final boolean requestStatusBroadcastWasRequested;
+ synchronized (eventMonitor)
{
- try
+ if (!isShutdownInitiated() && !pendingStatusMessage)
{
- shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+ eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
}
- catch (InterruptedException e)
- {
- // Server shutdown monitor may interrupt slow threads.
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- shutdown = true;
- break;
- }
- }
- }
-
- // Go through each connected DS, get the number of pending changes we have
- // for it and change status accordingly if threshold value is
- // crossed/uncrossed
- for (DataServerHandler serverHandler :
- replicationServerDomain.getConnectedDSs().values())
- {
- // Get number of pending changes for this server
- int nChanges = serverHandler.getRcvMsgQueueSize();
- if (debugEnabled())
- {
- TRACER.debugInfo(getMessage("Status analyzer: DS "
- + serverHandler.getServerId() + " has " + nChanges
- + " message(s) in writer queue."));
+ requestStatusBroadcastWasRequested = pendingStatusMessage;
+ pendingStatusMessage = false;
}
- // Check status to know if it is relevant to change the status. Do not
- // take RSD lock to test. If we attempt to change the status whereas
- // the current status does allow it, this will be noticed by
- // the changeStatusFromStatusAnalyzer() method. This allows to take the
- // lock roughly only when needed versus every sleep time timeout.
- if (degradedStatusThreshold > 0)
- // Threshold value = 0 means no status analyzer (no degrading system)
- // we should not have that as the status analyzer thread should not be
- // created if this is the case, but for sanity purpose, we add this
- // test
+ if (isShutdownInitiated())
{
- if (nChanges >= degradedStatusThreshold)
- {
- if (serverHandler.getStatus() == NORMAL_STATUS
- && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
- {
- break;
- }
- }
- else
- {
- if (serverHandler.getStatus() == DEGRADED_STATUS
- && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
- {
- break;
- }
- }
+ break;
+ }
+
+ // Broadcast heartbeats, topology messages, etc if requested.
+ if (requestStatusBroadcastWasRequested)
+ {
+ replicationServerDomain.sendPendingStatusMessages();
+ }
+
+ /*
+ * Check the degraded status for connected DS instances only if
+ * sufficient time has passed. The current time is not cached because
+ * the call to checkDSDegradedStatus may take some time.
+ */
+ if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
+ {
+ replicationServerDomain.checkDSDegradedStatus();
+ nextCheckDSDegradedStatusTime = System.currentTimeMillis()
+ + STATUS_ANALYZER_SLEEP_TIME;
}
}
}
+ catch (InterruptedException e)
+ {
+ // Forcefully stopped.
+ }
- done = true;
TRACER.debugInfo(getMessage("Status analyzer is terminated."));
}
+
+
private String getMessage(String message)
{
return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -176,75 +152,50 @@
+ message;
}
- private boolean isInterrupted(DataServerHandler serverHandler,
- StatusMachineEvent event)
- {
- if (replicationServerDomain.changeStatus(serverHandler, event))
- {
- // Finish job and let thread die
- TRACER.debugInfo(
- getMessage("Status analyzer has been interrupted and will die."));
- return true;
- }
- return false;
- }
+
/**
* Stops the thread.
*/
- public void shutdown()
+ void shutdown()
{
- synchronized (shutdownLock)
- {
- shutdown = true;
- shutdownLock.notifyAll();
-
- if (debugEnabled())
- {
- TRACER.debugInfo(getMessage("Shutting down status analyzer."));
- }
- }
- }
-
- /**
- * Waits for analyzer death. If not finished within 2 seconds,
- * forces interruption
- */
- public void waitForShutdown()
- {
- try
- {
- int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
- int n = 0;
- while (!done && this.isAlive())
- {
- Thread.sleep(50);
- n++;
- if (n >= FACTOR)
- {
- TRACER.debugInfo(getMessage("Interrupting status analyzer."));
- interrupt();
- }
- }
- } catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
- }
- }
-
- /**
- * Sets the threshold value.
- * @param degradedStatusThreshold The new threshold value.
- */
- public void setDegradedStatusThreshold(int degradedStatusThreshold)
- {
+ initiateShutdown();
if (debugEnabled())
{
- TRACER.debugInfo(getMessage(
- "Directory server status analyzer changing threshold value to "
- + degradedStatusThreshold));
+ TRACER.debugInfo(getMessage("Shutting down status analyzer."));
}
+ synchronized (eventMonitor)
+ {
+ eventMonitor.notifyAll();
+ }
+ try
+ {
+ join(2000);
+ }
+ catch (InterruptedException e)
+ {
+ // Trapped: forcefully stop the thread.
+ }
+ if (isAlive())
+ {
+ // The join timed out or was interrupted so attempt to forcefully stop the
+ // analyzer.
+ interrupt();
+ }
+ }
- this.degradedStatusThreshold = degradedStatusThreshold;
+
+
+ /**
+ * Requests that a topology state related message be broadcast to the rest of
+ * the topology. Messages include DS heartbeats, topology information, etc.
+ */
+ void notifyPendingStatusMessage()
+ {
+ synchronized (eventMonitor)
+ {
+ pendingStatusMessage = true;
+ eventMonitor.notifyAll();
+ }
}
}
--
Gitblit v1.10.0