From f9fc57cba81bcf9a8259a8ea699eb999c0415397 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 21 Aug 2013 13:26:47 +0000
Subject: [PATCH] Extracted ReplicationDomainMonitor class out of ReplicationServerDomain to increase its cohesion.
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 472 +++++++---------------------------------------------------
1 files changed, 61 insertions(+), 411 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3a10c23..2bedd43 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -32,7 +32,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,7 +50,6 @@
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
-import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -92,6 +90,11 @@
*/
private AtomicReference<MonitoringPublisher> monitoringPublisher =
new AtomicReference<MonitoringPublisher>();
+ /**
+ * Maintains monitor data for the current domain.
+ */
+ private ReplicationDomainMonitor domainMonitor =
+ new ReplicationDomainMonitor(this);
/**
* The following map contains one balanced tree for each replica ID to which
@@ -128,63 +131,6 @@
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
- // Monitor data management
- /**
- * The monitor data consolidated over the topology.
- */
- private volatile MonitorData monitorData = new MonitorData();
-
- /**
- * This lock guards against multiple concurrent monitor data recalculation.
- */
- private final Object pendingMonitorLock = new Object();
-
- /** Guarded by pendingMonitorLock. */
- private long monitorDataLastBuildDate = 0;
-
- /**
- * The set of replication servers which are already known to be slow to send
- * monitor data.
- * <p>
- * Guarded by pendingMonitorLock.
- */
- private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
-
- /** This lock serializes updates to the pending monitor data. */
- private final Object pendingMonitorDataLock = new Object();
-
- /**
- * Monitor data which is currently being calculated. Guarded by
- * pendingMonitorDataLock.
- */
- private MonitorData pendingMonitorData;
-
- /**
- * A set containing the IDs of servers from which we are currently expecting
- * monitor responses. When a response is received from a server we remove the
- * ID from this table, and count down the latch if the ID was in the table.
- * <p>
- * Guarded by pendingMonitorDataLock.
- */
- private final Set<Integer> pendingMonitorDataServerIDs =
- new HashSet<Integer>();
-
- /**
- * This latch is non-null and is used in order to count incoming responses as
- * they arrive. Since incoming response may arrive at any time, even when
- * there is no pending monitor request, access to the latch must be guarded.
- * <p>
- * Guarded by pendingMonitorDataLock.
- */
- private CountDownLatch pendingMonitorDataLatch = null;
-
- /**
- * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
- */
- private final long monitorDataLifeTime = 500;
-
-
-
/**
* The needed info for each received assured update message we are waiting
* acks for.
@@ -251,11 +197,7 @@
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
-
- if (generationId < 0)
- {
- generationId = sourceHandler.getGenerationId();
- }
+ setGenerationIdIfUnset(sourceHandler.getGenerationId());
/**
* If this is an assured message (a message requesting ack), we must
@@ -424,19 +366,17 @@
{
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
{
- TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
- + update.getChangeNumber()
+ TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
+ " will not be sent to directory server "
+ dsHandler.getServerId() + " with generation id "
+ dsHandler.getGenerationId() + " different from local "
- + "generation id " + generationId);
+ + "generation id " + generationId));
}
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
{
- TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
- + " for dn " + baseDn + ", update " + update.getChangeNumber()
+ TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
+ " will not be sent to directory server "
- + dsHandler.getServerId() + " as it is in full update");
+ + dsHandler.getServerId() + " as it is in full update"));
}
}
@@ -869,11 +809,9 @@
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
if (debugEnabled())
{
- TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
- + " for "+ baseDn
- + ", sending timeout for assured update with change "
- + " number " + cn + " to server id "
- + origServer.getServerId());
+ TRACER.debugInfo(getMessage(
+ "sending timeout for assured update with change number " + cn
+ + " to server id " + origServer.getServerId()));
}
try
{
@@ -1112,8 +1050,7 @@
unregisterServerHandler(sHandler);
sHandler.shutdown();
- // Check if generation id has to be reset
- mayResetGenerationId();
+ resetGenerationIdIfPossible();
if (!shutdown)
{
if (isDirectoryServer)
@@ -1211,16 +1148,13 @@
* - traverse replicationServers list and test for each if DS are connected
* So it strongly relies on the directoryServers list
*/
- private void mayResetGenerationId()
+ private void resetGenerationIdIfPossible()
{
- String prefix =
- "In RS " + this.localReplicationServer.getMonitorInstanceName()
- + " for " + baseDn + " ";
-
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus="
- + generationIdSavedStatus);
+ TRACER.debugInfo(getMessage(
+ "mayResetGenerationId generationIdSavedStatus="
+ + generationIdSavedStatus));
}
// If there is no more any LDAP server connected to this domain in the
@@ -1235,9 +1169,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "mayResetGenerationId skip RS "
+ TRACER.debugInfo(getMessage("mayResetGenerationId skip RS "
+ rsHandler.getMonitorInstanceName()
- + " that has different genId");
+ + " that has different genId"));
}
}
else if (rsHandler.hasRemoteLDAPServers())
@@ -1246,10 +1180,10 @@
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "mayResetGenerationId RS "
+ TRACER.debugInfo(getMessage("mayResetGenerationId RS "
+ rsHandler.getMonitorInstanceName()
+ " has ldap servers connected to it"
- + " - will not reset generationId");
+ + " - will not reset generationId"));
}
break;
}
@@ -1261,13 +1195,13 @@
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "has ldap servers connected to it"
- + " - will not reset generationId");
+ TRACER.debugInfo(getMessage("has ldap servers connected to it"
+ + " - will not reset generationId"));
}
}
if (!ldapServersConnectedInTheTopology
- && !this.generationIdSavedStatus
+ && !generationIdSavedStatus
&& generationId != -1)
{
changeGenerationId(-1, false);
@@ -1544,7 +1478,8 @@
} else if (msg instanceof MonitorMsg)
{
MonitorMsg monitorMsg = (MonitorMsg) msg;
- receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
+ domainMonitor.receiveMonitorDataResponse(monitorMsg,
+ msgEmitter.getServerId());
} else
{
replyWithUnroutableMsgType(msgEmitter, msg);
@@ -1573,10 +1508,10 @@
if (msgEmitter.isDataServer())
{
// Monitoring information requested by a DS
- MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID(), monitorData);
try
{
+ MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID());
msgEmitter.send(monitorMsg);
}
catch (IOException e)
@@ -1711,33 +1646,25 @@
* The sender of this message.
* @param destination
* The destination of this message.
- * @param monitorData
- * The domain monitor data which should be used for the message.
* @return The newly created and filled MonitorMsg. Null if a problem occurred
* during message creation.
*/
- public MonitorMsg createGlobalTopologyMonitorMsg(
- int sender, int destination, MonitorData monitorData)
+ public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
{
final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
-
returnMsg.setReplServerDbState(getDbServerState());
- // Add the informations about the Replicas currently in the topology.
- Iterator<Integer> it = monitorData.ldapIterator();
- while (it.hasNext())
+ // Add the server state for each DS and RS currently in the topology.
+ final MonitorData monitorData = getDomainMonitorData();
+ for (int replicaId : toIterable(monitorData.ldapIterator()))
{
- int replicaId = it.next();
returnMsg.setServerState(replicaId,
monitorData.getLDAPServerState(replicaId),
monitorData.getApproxFirstMissingDate(replicaId), true);
}
- // Add the information about the RSs currently in the topology.
- it = monitorData.rsIterator();
- while (it.hasNext())
+ for (int replicaId : toIterable(monitorData.rsIterator()))
{
- int replicaId = it.next();
returnMsg.setServerState(replicaId,
monitorData.getRSStates(replicaId),
monitorData.getRSApproxFirstMissingDate(replicaId), false);
@@ -1774,27 +1701,22 @@
try
{
- MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+ final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+ monitorMsg.setReplServerDbState(getDbServerState());
- // Populate for each connected LDAP Server
- // from the states stored in the serverHandler.
- // - the server state
- // - the older missing change
+ // Add the server state for each connected DS and RS.
for (DataServerHandler dsHandler : this.connectedDSs.values())
{
monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
.getServerState(), dsHandler.getApproxFirstMissingDate(), true);
}
- // Same for the connected RS
for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
{
monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
.getServerState(), rsHandler.getApproxFirstMissingDate(), false);
}
- // Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerDbState(getDbServerState());
return monitorMsg;
}
finally
@@ -2060,7 +1982,6 @@
if (this.generationId != generationId)
{
- // we are changing of genId
clearDbs();
this.generationId = generationId;
@@ -2187,10 +2108,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " + getLocalRSServerId() +
- " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
- " for baseDn " + baseDn + ":\n" + csMsg);
+ TRACER.debugInfo(getMessage("receiving ChangeStatusMsg from "
+ + senderHandler.getServerId() + ":\n" + csMsg));
}
try
@@ -2421,9 +2340,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In RS " + getLocalRSServerId()
- + " Receiving TopologyMsg from " + rsHandler.getServerId()
- + " for baseDn " + baseDn + ":\n" + topoMsg);
+ TRACER.debugInfo(getMessage("receiving TopologyMsg from "
+ + rsHandler.getServerId() + ":\n" + topoMsg));
}
try
@@ -2448,12 +2366,8 @@
// Handle generation id
if (allowResetGenId)
{
- // Check if generation id has to be reset
- mayResetGenerationId();
- if (generationId < 0)
- {
- generationId = rsHandler.getGenerationId();
- }
+ resetGenerationIdIfPossible();
+ setGenerationIdIfUnset(rsHandler.getGenerationId());
}
if (isDifferentGenerationId(rsHandler.getGenerationId()))
@@ -2487,10 +2401,13 @@
}
}
- /* =======================
- * Monitor Data generation
- * =======================
- */
+ private void setGenerationIdIfUnset(long generationId)
+ {
+ if (this.generationId < 0)
+ {
+ this.generationId = generationId;
+ }
+ }
/**
* Returns the latest monitor data available for this replication server
@@ -2501,274 +2418,7 @@
*/
MonitorData getDomainMonitorData()
{
- return monitorData;
- }
-
-
-
- /**
- * Recomputes the monitor data for this replication server domain.
- *
- * @return The recomputed monitor data for this replication server domain.
- * @throws InterruptedException
- * If this thread is interrupted while waiting for a response.
- */
- MonitorData computeDomainMonitorData() throws InterruptedException
- {
- // Only allow monitor recalculation at a time.
- synchronized (pendingMonitorLock)
- {
- if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
- .getTime())
- {
- try
- {
- // Prevent out of band monitor responses from updating our pending
- // table until we are ready.
- synchronized (pendingMonitorDataLock)
- {
- // Clear the pending monitor data.
- pendingMonitorDataServerIDs.clear();
- pendingMonitorData = new MonitorData();
-
- // Initialize the monitor data.
- initializePendingMonitorData();
-
- // Send the monitor requests to the connected replication servers.
- for (ReplicationServerHandler rs : connectedRSs.values())
- {
- // Add server ID to pending table.
- int serverId = rs.getServerId();
-
- MonitorRequestMsg msg = new MonitorRequestMsg(
- this.localReplicationServer.getServerId(), serverId);
- try
- {
- rs.send(msg);
-
- // Only register this server ID if we were able to send the
- // message.
- pendingMonitorDataServerIDs.add(serverId);
- }
- catch (IOException e)
- {
- // Log a message and do a best effort from here.
- Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
- .get(baseDn, serverId, e.getMessage());
- logError(message);
- }
- }
-
- // Create the pending response latch based on the number of expected
- // monitor responses.
- pendingMonitorDataLatch = new CountDownLatch(
- pendingMonitorDataServerIDs.size());
- }
-
- // Wait for the responses to come back.
- pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
-
- // Log messages for replication servers that have gone or come back.
- synchronized (pendingMonitorDataLock)
- {
- // Log servers that have come back.
- for (int serverId : monitorDataLateServers)
- {
- // Ensure that we only log once per server: don't fill the
- // error log with repeated messages.
- if (!pendingMonitorDataServerIDs.contains(serverId))
- {
- logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
- serverId));
- }
- }
-
- // Log servers that have gone away.
- for (int serverId : pendingMonitorDataServerIDs)
- {
- // Ensure that we only log once per server: don't fill the
- // error log with repeated messages.
- if (!monitorDataLateServers.contains(serverId))
- {
- logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
- serverId));
- }
- }
-
- // Remember which servers were late this time.
- monitorDataLateServers.clear();
- monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
- }
-
- // Store the new computed data as the reference
- synchronized (pendingMonitorDataLock)
- {
- // Now we have the expected answers or an error occurred
- pendingMonitorData.completeComputing();
- monitorData = pendingMonitorData;
- monitorDataLastBuildDate = TimeThread.getTime();
- }
- }
- finally
- {
- synchronized (pendingMonitorDataLock)
- {
- // Clear pending state.
- pendingMonitorData = null;
- pendingMonitorDataLatch = null;
- pendingMonitorDataServerIDs.clear();
- }
- }
- }
- }
-
- return monitorData;
- }
-
-
-
- /**
- * Start collecting global monitoring information for this
- * ReplicationServerDomain.
- */
-
- private void initializePendingMonitorData()
- {
- // Let's process our directly connected DS
- // - in the ServerHandler for a given DS1, the stored state contains :
- // - the max CN produced by DS1
- // - the last CN consumed by DS1 from DS2..n
- // - in the RSdomain/dbHandler, the built-in state contains :
- // - the max CN produced by each server
- // So for a given DS connected we can take the state and the max from
- // the DS/state.
-
- for (ServerHandler ds : connectedDSs.values())
- {
- int serverID = ds.getServerId();
-
- // the state comes from the state stored in the SH
- ServerState dsState = ds.getServerState().duplicate();
-
- // the max CN sent by that LS also comes from the SH
- ChangeNumber maxcn = dsState.getChangeNumber(serverID);
- if (maxcn == null)
- {
- // This directly connected LS has never produced any change
- maxcn = new ChangeNumber(0, 0, serverID);
- }
- pendingMonitorData.setMaxCN(serverID, maxcn);
- pendingMonitorData.setLDAPServerState(serverID, dsState);
- pendingMonitorData.setFirstMissingDate(serverID,
- ds.getApproxFirstMissingDate());
- }
-
- // Then initialize the max CN for the LS that produced something
- // - from our own local db state
- // - whatever they are directly or indirectly connected
- ServerState dbServerState = getDbServerState();
- pendingMonitorData.setRSState(localReplicationServer.getServerId(),
- dbServerState);
- for (int serverId : dbServerState) {
- ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
- pendingMonitorData.setMaxCN(serverId, storedCN);
- }
- }
-
-
-
- /**
- * Processes a Monitor message receives from a remote Replication Server and
- * stores the data received.
- *
- * @param msg
- * The message to be processed.
- * @param serverId
- * server handler that is receiving the message.
- */
- private void receivesMonitorDataResponse(MonitorMsg msg, int serverId)
- {
- synchronized (pendingMonitorDataLock)
- {
- if (pendingMonitorData == null)
- {
- // This is a response for an earlier request whose computing is
- // already complete.
- logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
- msg.getSenderID()));
- return;
- }
-
- try
- {
- // Here is the RS state : list <serverID, lastChangeNumber>
- // For each LDAP Server, we keep the max CN across the RSes
- ServerState replServerState = msg.getReplServerDbState();
- pendingMonitorData.setMaxCNs(replServerState);
-
- // store the remote RS states.
- pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
-
- // Store the remote LDAP servers states
- Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
- while (dsServerIdIterator.hasNext())
- {
- int dsServerId = dsServerIdIterator.next();
- ServerState dsServerState = msg.getLDAPServerState(dsServerId);
- pendingMonitorData.setMaxCNs(dsServerState);
- pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
- pendingMonitorData.setFirstMissingDate(dsServerId,
- msg.getLDAPApproxFirstMissingDate(dsServerId));
- }
-
- // Process the latency reported by the remote RSi on its connections
- // to the other RSes
- Iterator<Integer> rsServerIdIterator = msg.rsIterator();
- while (rsServerIdIterator.hasNext())
- {
- int rsServerId = rsServerIdIterator.next();
- long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
- if (rsServerId == localReplicationServer.getServerId())
- {
- // this is the latency of the remote RSi regarding the current RS
- // let's update the fmd of my connected LS
- for (DataServerHandler connectedDS : connectedDSs.values())
- {
- int connectedServerId = connectedDS.getServerId();
- pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
- }
- }
- else
- {
- // this is the latency of the remote RSi regarding another RSj
- // let's update the latency of the LSes connected to RSj
- ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
- if (rsjHdr != null)
- {
- for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
- {
- pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
- }
- }
- }
- }
- }
- catch (RuntimeException e)
- {
- // FIXME: do we really expect these???
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
- e.getMessage() + stackTraceToSingleLineString(e)));
- }
- finally
- {
- // Decreases the number of expected responses and potentially
- // wakes up the waiting requester thread.
- if (pendingMonitorDataServerIDs.remove(serverId))
- {
- pendingMonitorDataLatch.countDown();
- }
- }
- }
+ return domainMonitor.getMonitorData();
}
/**
@@ -2791,7 +2441,7 @@
*/
public Map<Integer, DataServerHandler> getConnectedDSs()
{
- return connectedDSs;
+ return Collections.unmodifiableMap(connectedDSs);
}
/**
@@ -2800,7 +2450,7 @@
*/
public Map<Integer, ReplicationServerHandler> getConnectedRSs()
{
- return connectedRSs;
+ return Collections.unmodifiableMap(connectedRSs);
}
@@ -2960,19 +2610,13 @@
String.valueOf(localReplicationServer.getServerId())));
attributes.add(Attributes.create("replication-server-port",
String.valueOf(localReplicationServer.getReplicationPort())));
-
- // Add all the base DNs that are known by this replication server.
attributes.add(Attributes.create("domain-name", baseDn));
-
- // Publish to monitor the generation ID by replicationServerDomain
attributes.add(Attributes.create("generation-id",
baseDn + " " + generationId));
- MonitorData md = getDomainMonitorData();
-
// Missing changes
- long missingChanges = md.getMissingChangesRS(localReplicationServer
- .getServerId());
+ long missingChanges = getDomainMonitorData().getMissingChangesRS(
+ localReplicationServer.getServerId());
attributes.add(Attributes.create("missing-changes",
String.valueOf(missingChanges)));
@@ -3439,4 +3083,10 @@
// connection): store handler.
connectedRSs.put(rsHandler.getServerId(), rsHandler);
}
+
+ private String getMessage(String message)
+ {
+ return "In RS " + localReplicationServer.getServerId() + " for " + baseDn
+ + ": " + message;
+ }
}
--
Gitblit v1.10.0