From 9d06956010f0e9558909a802cf6ed81c27ca5dd8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 21 Aug 2013 13:26:47 +0000
Subject: [PATCH] Extracted ReplicationDomainMonitor class out of ReplicationServerDomain to increase its cohesion.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java | 22
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 472 ++++---------------------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java | 388 +++++++++++++++++++++++++
3 files changed, 457 insertions(+), 425 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index 1a5e7bf..5993812 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -54,8 +54,8 @@
*/
private static final DebugTracer TRACER = getTracer();
- /** The domain we send monitoring for. */
- private final ReplicationServerDomain replicationServerDomain;
+ /** The replication domain we send monitoring for. */
+ private final ReplicationServerDomain domain;
/** Sleep time (in ms) before sending new monitoring messages. */
private volatile long period;
@@ -79,7 +79,7 @@
+ ") monitor publisher for domain \""
+ replicationServerDomain.getBaseDn() + "\"");
- this.replicationServerDomain = replicationServerDomain;
+ this.domain = replicationServerDomain;
this.period = period;
}
@@ -107,15 +107,10 @@
}
// Send global topology information to peer DSs
- MonitorData monitorData = replicationServerDomain
- .computeDomainMonitorData();
+ MonitorMsg monitorMsg = domain.createGlobalTopologyMonitorMsg(0, 0);
+ final int localServerId = domain.getLocalRSServerId();
- MonitorMsg monitorMsg = replicationServerDomain
- .createGlobalTopologyMonitorMsg(0, 0, monitorData);
-
- int localServerId = replicationServerDomain.getLocalRSServerId();
- for (ServerHandler serverHandler : replicationServerDomain
- .getConnectedDSs().values())
+ for (ServerHandler serverHandler : domain.getConnectedDSs().values())
{
// Set the right sender and destination ids
monitorMsg.setSenderID(localServerId);
@@ -203,8 +198,7 @@
private String getMessage(String message)
{
- return "In RS " + replicationServerDomain.getLocalRSServerId()
- + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
- + message;
+ return "In RS " + domain.getLocalRSServerId() + ", for base dn "
+ + domain.getBaseDn() + ": " + message;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
new file mode 100644
index 0000000..ae49d6b
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -0,0 +1,388 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2011-2013 ForgeRock AS
+ */
+package org.opends.server.replication.server;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.opends.messages.Message;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.util.TimeThread;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
+/**
+ * This class maintains monitor data for a replication domain.
+ */
+class ReplicationDomainMonitor
+{
+
+ /**
+ * The monitor data consolidated over the topology.
+ */
+ private volatile MonitorData monitorData = new MonitorData();
+
+ /**
+ * This lock guards against multiple concurrent monitor data recalculation.
+ */
+ private final Object pendingMonitorLock = new Object();
+
+ /** Guarded by pendingMonitorLock. */
+ private long monitorDataLastBuildDate = 0;
+
+ /**
+ * The set of replication servers which are already known to be slow to send
+ * monitor data.
+ * <p>
+ * Guarded by pendingMonitorLock.
+ */
+ private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
+
+ /** This lock serializes updates to the pending monitor data. */
+ private final Object pendingMonitorDataLock = new Object();
+
+ /**
+ * Monitor data which is currently being calculated.
+ * <p>
+ * Guarded by pendingMonitorDataLock.
+ */
+ private MonitorData pendingMonitorData;
+
+ /**
+ * A set containing the IDs of servers from which we are currently expecting
+ * monitor responses. When a response is received from a server we remove the
+ * ID from this table, and count down the latch if the ID was in the table.
+ * <p>
+ * Guarded by pendingMonitorDataLock.
+ */
+ private final Set<Integer> pendingMonitorDataServerIDs =
+ new HashSet<Integer>();
+
+ /**
+ * This latch is non-null and is used in order to count incoming responses as
+ * they arrive. Since incoming response may arrive at any time, even when
+ * there is no pending monitor request, access to the latch must be guarded.
+ * <p>
+ * Guarded by pendingMonitorDataLock.
+ */
+ private CountDownLatch pendingMonitorDataLatch = null;
+
+ /**
+ * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
+ */
+ private final long monitorDataLifeTime = 500;
+
+ /**
+ * The replication domain monitored by this class.
+ */
+ private final ReplicationServerDomain domain;
+
+
+ /**
+ * Builds an object of this class.
+ *
+ * @param replicationDomain
+ * The replication domain that will be monitored by this class
+ */
+ public ReplicationDomainMonitor(ReplicationServerDomain replicationDomain)
+ {
+ this.domain = replicationDomain;
+ }
+
+ /**
+ * Returns the latest monitor data available for this replication server
+ * domain.
+ *
+ * @return The latest monitor data available for this replication server
+ * domain, which is never {@code null}.
+ */
+ public MonitorData getMonitorData()
+ {
+ return monitorData;
+ }
+
+ /**
+ * Recomputes the monitor data for this replication server domain.
+ *
+ * @return The recomputed monitor data for this replication server domain.
+ * @throws InterruptedException
+ * If this thread is interrupted while waiting for a response.
+ */
+ public MonitorData computeDomainMonitorData() throws InterruptedException
+ {
+ // Only allow monitor recalculation at a time.
+ synchronized (pendingMonitorLock)
+ {
+ if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
+ .getTime())
+ {
+ try
+ {
+ // Prevent out of band monitor responses from updating our pending
+ // table until we are ready.
+ synchronized (pendingMonitorDataLock)
+ {
+ // Clear the pending monitor data.
+ pendingMonitorDataServerIDs.clear();
+ pendingMonitorData = new MonitorData();
+
+ initializePendingMonitorData();
+
+ // Send the monitor requests to the connected replication servers.
+ for (ServerHandler rs : domain.getConnectedRSs().values())
+ {
+ final int serverId = rs.getServerId();
+
+ MonitorRequestMsg msg =
+ new MonitorRequestMsg(domain.getLocalRSServerId(), serverId);
+ try
+ {
+ rs.send(msg);
+
+ // Only register this server ID to pending table if we were able
+ // to send the message.
+ pendingMonitorDataServerIDs.add(serverId);
+ }
+ catch (IOException e)
+ {
+ // Log a message and do a best effort from here.
+ Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get(
+ domain.getBaseDn(), serverId, e.getMessage());
+ logError(message);
+ }
+ }
+
+ // Create the pending response latch based on the number of expected
+ // monitor responses.
+ pendingMonitorDataLatch =
+ new CountDownLatch(pendingMonitorDataServerIDs.size());
+ }
+
+ // Wait for the responses to come back.
+ pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
+
+ // Log messages for replication servers that have gone or come back.
+ synchronized (pendingMonitorDataLock)
+ {
+ // Log servers that have come back.
+ for (int serverId : monitorDataLateServers)
+ {
+ // Ensure that we only log once per server: don't fill the
+ // error log with repeated messages.
+ if (!pendingMonitorDataServerIDs.contains(serverId))
+ {
+ logError(NOTE_MONITOR_DATA_RECEIVED.get(
+ domain.getBaseDn(), serverId));
+ }
+ }
+
+ // Log servers that have gone away.
+ for (int serverId : pendingMonitorDataServerIDs)
+ {
+ // Ensure that we only log once per server: don't fill the
+ // error log with repeated messages.
+ if (!monitorDataLateServers.contains(serverId))
+ {
+ logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(
+ domain.getBaseDn(), serverId));
+ }
+ }
+
+ // Remember which servers were late this time.
+ monitorDataLateServers.clear();
+ monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
+ }
+
+ // Store the new computed data as the reference
+ synchronized (pendingMonitorDataLock)
+ {
+ // Now we have the expected answers or an error occurred
+ pendingMonitorData.completeComputing();
+ monitorData = pendingMonitorData;
+ monitorDataLastBuildDate = TimeThread.getTime();
+ }
+ }
+ finally
+ {
+ synchronized (pendingMonitorDataLock)
+ {
+ // Clear pending state.
+ pendingMonitorData = null;
+ pendingMonitorDataLatch = null;
+ pendingMonitorDataServerIDs.clear();
+ }
+ }
+ }
+ }
+
+ return monitorData;
+ }
+
+ /**
+ * Start collecting global monitoring information for the replication domain.
+ */
+ private void initializePendingMonitorData()
+ {
+ // Let's process our directly connected DS
+ // - in the ServerHandler for a given DS1, the stored state contains :
+ // -- the max CN produced by DS1
+ // -- the last CN consumed by DS1 from DS2..n
+ // - in the RSdomain/dbHandler, the built-in state contains :
+ // -- the max CN produced by each server
+ // So for a given DS connected we can take the state and the max from
+ // the DS/state.
+
+ for (ServerHandler ds : domain.getConnectedDSs().values())
+ {
+ final int serverId = ds.getServerId();
+ final ServerState dsState = ds.getServerState().duplicate();
+
+ ChangeNumber maxcn = dsState.getChangeNumber(serverId);
+ if (maxcn == null)
+ {
+ // This directly connected LS has never produced any change
+ maxcn = new ChangeNumber(0, 0, serverId);
+ }
+ pendingMonitorData.setMaxCN(serverId, maxcn);
+ pendingMonitorData.setLDAPServerState(serverId, dsState);
+ pendingMonitorData.setFirstMissingDate(serverId,
+ ds.getApproxFirstMissingDate());
+ }
+
+ // Then initialize the max CN for the LS that produced something
+ // - from our own local db state
+ // - whatever they are directly or indirectly connected
+ final ServerState dbServerState = domain.getDbServerState();
+ pendingMonitorData.setRSState(domain.getLocalRSServerId(), dbServerState);
+ for (int serverId : dbServerState)
+ {
+ ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
+ pendingMonitorData.setMaxCN(serverId, storedCN);
+ }
+ }
+
+ /**
+ * Processes a Monitor message receives from a remote Replication Server and
+ * stores the data received.
+ *
+ * @param msg
+ * The message to be processed.
+ * @param serverId
+ * server handler that is receiving the message.
+ */
+ public void receiveMonitorDataResponse(MonitorMsg msg, int serverId)
+ {
+ synchronized (pendingMonitorDataLock)
+ {
+ if (pendingMonitorData == null)
+ {
+ // This is a response for an earlier request whose computing is
+ // already complete.
+ logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(domain.getBaseDn(),
+ msg.getSenderID()));
+ return;
+ }
+
+ try
+ {
+ // Here is the RS state : list <serverID, lastChangeNumber>
+ // For each LDAP Server, we keep the max CN across the RSes
+ ServerState replServerState = msg.getReplServerDbState();
+ pendingMonitorData.setMaxCNs(replServerState);
+
+ // store the remote RS states.
+ pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
+
+ // Store the remote LDAP servers states
+ for (int dsServerId : toIterable(msg.ldapIterator()))
+ {
+ ServerState dsServerState = msg.getLDAPServerState(dsServerId);
+ pendingMonitorData.setMaxCNs(dsServerState);
+ pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
+ pendingMonitorData.setFirstMissingDate(dsServerId,
+ msg.getLDAPApproxFirstMissingDate(dsServerId));
+ }
+
+ // Process the latency reported by the remote RSi on its connections
+ // to the other RSes
+ for (int rsServerId : toIterable(msg.rsIterator()))
+ {
+ long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
+ if (rsServerId == domain.getLocalRSServerId())
+ {
+ // this is the latency of the remote RSi regarding the current RS
+ // let's update the first missing date of my connected LS
+ for (DataServerHandler ds : domain.getConnectedDSs().values())
+ {
+ int connectedServerId = ds.getServerId();
+ pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
+ }
+ }
+ else
+ {
+ // this is the latency of the remote RSi regarding another RSj
+ // let's update the latency of the LSes connected to RSj
+ ReplicationServerHandler rsjHdr =
+ domain.getConnectedRSs().get(rsServerId);
+ if (rsjHdr != null)
+ {
+ for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
+ {
+ pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
+ }
+ }
+ }
+ }
+ }
+ catch (RuntimeException e)
+ {
+ // FIXME: do we really expect these???
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+ e.getMessage() + stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ // Decreases the number of expected responses and potentially
+ // wakes up the waiting requester thread.
+ if (pendingMonitorDataServerIDs.remove(serverId))
+ {
+ pendingMonitorDataLatch.countDown();
+ }
+ }
+ }
+ }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3a10c23..2bedd43 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -32,7 +32,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,7 +50,6 @@
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
-import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -92,6 +90,11 @@
*/
private AtomicReference<MonitoringPublisher> monitoringPublisher =
new AtomicReference<MonitoringPublisher>();
+ /**
+ * Maintains monitor data for the current domain.
+ */
+ private ReplicationDomainMonitor domainMonitor =
+ new ReplicationDomainMonitor(this);
/**
* The following map contains one balanced tree for each replica ID to which
@@ -128,63 +131,6 @@
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
- // Monitor data management
- /**
- * The monitor data consolidated over the topology.
- */
- private volatile MonitorData monitorData = new MonitorData();
-
- /**
- * This lock guards against multiple concurrent monitor data recalculation.
- */
- private final Object pendingMonitorLock = new Object();
-
- /** Guarded by pendingMonitorLock. */
- private long monitorDataLastBuildDate = 0;
-
- /**
- * The set of replication servers which are already known to be slow to send
- * monitor data.
- * <p>
- * Guarded by pendingMonitorLock.
- */
- private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
-
- /** This lock serializes updates to the pending monitor data. */
- private final Object pendingMonitorDataLock = new Object();
-
- /**
- * Monitor data which is currently being calculated. Guarded by
- * pendingMonitorDataLock.
- */
- private MonitorData pendingMonitorData;
-
- /**
- * A set containing the IDs of servers from which we are currently expecting
- * monitor responses. When a response is received from a server we remove the
- * ID from this table, and count down the latch if the ID was in the table.
- * <p>
- * Guarded by pendingMonitorDataLock.
- */
- private final Set<Integer> pendingMonitorDataServerIDs =
- new HashSet<Integer>();
-
- /**
- * This latch is non-null and is used in order to count incoming responses as
- * they arrive. Since incoming response may arrive at any time, even when
- * there is no pending monitor request, access to the latch must be guarded.
- * <p>
- * Guarded by pendingMonitorDataLock.
- */
- private CountDownLatch pendingMonitorDataLatch = null;
-
- /**
- * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
- */
- private final long monitorDataLifeTime = 500;
-
-
-
/**
* The needed info for each received assured update message we are waiting
* acks for.
@@ -251,11 +197,7 @@
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
-
- if (generationId < 0)
- {
- generationId = sourceHandler.getGenerationId();
- }
+ setGenerationIdIfUnset(sourceHandler.getGenerationId());
/**
* If this is an assured message (a message requesting ack), we must
@@ -424,19 +366,17 @@
{
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
{
- TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
- + update.getChangeNumber()
+ TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
+ " will not be sent to directory server "
+ dsHandler.getServerId() + " with generation id "
+ dsHandler.getGenerationId() + " different from local "
- + "generation id " + generationId);
+ + "generation id " + generationId));
}
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
{
- TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
- + " for dn " + baseDn + ", update " + update.getChangeNumber()
+ TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
+ " will not be sent to directory server "
- + dsHandler.getServerId() + " as it is in full update");
+ + dsHandler.getServerId() + " as it is in full update"));
}
}
@@ -869,11 +809,9 @@
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
if (debugEnabled())
{
- TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
- + " for "+ baseDn
- + ", sending timeout for assured update with change "
- + " number " + cn + " to server id "
- + origServer.getServerId());
+ TRACER.debugInfo(getMessage(
+ "sending timeout for assured update with change number " + cn
+ + " to server id " + origServer.getServerId()));
}
try
{
@@ -1112,8 +1050,7 @@
unregisterServerHandler(sHandler);
sHandler.shutdown();
- // Check if generation id has to be reset
- mayResetGenerationId();
+ resetGenerationIdIfPossible();
if (!shutdown)
{
if (isDirectoryServer)
@@ -1211,16 +1148,13 @@
* - traverse replicationServers list and test for each if DS are connected
* So it strongly relies on the directoryServers list
*/
- private void mayResetGenerationId()
+ private void resetGenerationIdIfPossible()
{
- String prefix =
- "In RS " + this.localReplicationServer.getMonitorInstanceName()
- + " for " + baseDn + " ";
-
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus="
- + generationIdSavedStatus);
+ TRACER.debugInfo(getMessage(
+ "mayResetGenerationId generationIdSavedStatus="
+ + generationIdSavedStatus));
}
// If there is no more any LDAP server connected to this domain in the
@@ -1235,9 +1169,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "mayResetGenerationId skip RS "
+ TRACER.debugInfo(getMessage("mayResetGenerationId skip RS "
+ rsHandler.getMonitorInstanceName()
- + " that has different genId");
+ + " that has different genId"));
}
}
else if (rsHandler.hasRemoteLDAPServers())
@@ -1246,10 +1180,10 @@
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "mayResetGenerationId RS "
+ TRACER.debugInfo(getMessage("mayResetGenerationId RS "
+ rsHandler.getMonitorInstanceName()
+ " has ldap servers connected to it"
- + " - will not reset generationId");
+ + " - will not reset generationId"));
}
break;
}
@@ -1261,13 +1195,13 @@
if (debugEnabled())
{
- TRACER.debugInfo(prefix + "has ldap servers connected to it"
- + " - will not reset generationId");
+ TRACER.debugInfo(getMessage("has ldap servers connected to it"
+ + " - will not reset generationId"));
}
}
if (!ldapServersConnectedInTheTopology
- && !this.generationIdSavedStatus
+ && !generationIdSavedStatus
&& generationId != -1)
{
changeGenerationId(-1, false);
@@ -1544,7 +1478,8 @@
} else if (msg instanceof MonitorMsg)
{
MonitorMsg monitorMsg = (MonitorMsg) msg;
- receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
+ domainMonitor.receiveMonitorDataResponse(monitorMsg,
+ msgEmitter.getServerId());
} else
{
replyWithUnroutableMsgType(msgEmitter, msg);
@@ -1573,10 +1508,10 @@
if (msgEmitter.isDataServer())
{
// Monitoring information requested by a DS
- MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID(), monitorData);
try
{
+ MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID());
msgEmitter.send(monitorMsg);
}
catch (IOException e)
@@ -1711,33 +1646,25 @@
* The sender of this message.
* @param destination
* The destination of this message.
- * @param monitorData
- * The domain monitor data which should be used for the message.
* @return The newly created and filled MonitorMsg. Null if a problem occurred
* during message creation.
*/
- public MonitorMsg createGlobalTopologyMonitorMsg(
- int sender, int destination, MonitorData monitorData)
+ public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
{
final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
-
returnMsg.setReplServerDbState(getDbServerState());
- // Add the informations about the Replicas currently in the topology.
- Iterator<Integer> it = monitorData.ldapIterator();
- while (it.hasNext())
+ // Add the server state for each DS and RS currently in the topology.
+ final MonitorData monitorData = getDomainMonitorData();
+ for (int replicaId : toIterable(monitorData.ldapIterator()))
{
- int replicaId = it.next();
returnMsg.setServerState(replicaId,
monitorData.getLDAPServerState(replicaId),
monitorData.getApproxFirstMissingDate(replicaId), true);
}
- // Add the information about the RSs currently in the topology.
- it = monitorData.rsIterator();
- while (it.hasNext())
+ for (int replicaId : toIterable(monitorData.rsIterator()))
{
- int replicaId = it.next();
returnMsg.setServerState(replicaId,
monitorData.getRSStates(replicaId),
monitorData.getRSApproxFirstMissingDate(replicaId), false);
@@ -1774,27 +1701,22 @@
try
{
- MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+ final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+ monitorMsg.setReplServerDbState(getDbServerState());
- // Populate for each connected LDAP Server
- // from the states stored in the serverHandler.
- // - the server state
- // - the older missing change
+ // Add the server state for each connected DS and RS.
for (DataServerHandler dsHandler : this.connectedDSs.values())
{
monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
.getServerState(), dsHandler.getApproxFirstMissingDate(), true);
}
- // Same for the connected RS
for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
{
monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
.getServerState(), rsHandler.getApproxFirstMissingDate(), false);
}
- // Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerDbState(getDbServerState());
return monitorMsg;
}
finally
@@ -2060,7 +1982,6 @@
if (this.generationId != generationId)
{
- // we are changing of genId
clearDbs();
this.generationId = generationId;
@@ -2187,10 +2108,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " + getLocalRSServerId() +
- " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
- " for baseDn " + baseDn + ":\n" + csMsg);
+ TRACER.debugInfo(getMessage("receiving ChangeStatusMsg from "
+ + senderHandler.getServerId() + ":\n" + csMsg));
}
try
@@ -2421,9 +2340,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In RS " + getLocalRSServerId()
- + " Receiving TopologyMsg from " + rsHandler.getServerId()
- + " for baseDn " + baseDn + ":\n" + topoMsg);
+ TRACER.debugInfo(getMessage("receiving TopologyMsg from "
+ + rsHandler.getServerId() + ":\n" + topoMsg));
}
try
@@ -2448,12 +2366,8 @@
// Handle generation id
if (allowResetGenId)
{
- // Check if generation id has to be reset
- mayResetGenerationId();
- if (generationId < 0)
- {
- generationId = rsHandler.getGenerationId();
- }
+ resetGenerationIdIfPossible();
+ setGenerationIdIfUnset(rsHandler.getGenerationId());
}
if (isDifferentGenerationId(rsHandler.getGenerationId()))
@@ -2487,10 +2401,13 @@
}
}
- /* =======================
- * Monitor Data generation
- * =======================
- */
+ private void setGenerationIdIfUnset(long generationId)
+ {
+ if (this.generationId < 0)
+ {
+ this.generationId = generationId;
+ }
+ }
/**
* Returns the latest monitor data available for this replication server
@@ -2501,274 +2418,7 @@
*/
MonitorData getDomainMonitorData()
{
- return monitorData;
- }
-
-
-
- /**
- * Recomputes the monitor data for this replication server domain.
- *
- * @return The recomputed monitor data for this replication server domain.
- * @throws InterruptedException
- * If this thread is interrupted while waiting for a response.
- */
- MonitorData computeDomainMonitorData() throws InterruptedException
- {
- // Only allow monitor recalculation at a time.
- synchronized (pendingMonitorLock)
- {
- if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
- .getTime())
- {
- try
- {
- // Prevent out of band monitor responses from updating our pending
- // table until we are ready.
- synchronized (pendingMonitorDataLock)
- {
- // Clear the pending monitor data.
- pendingMonitorDataServerIDs.clear();
- pendingMonitorData = new MonitorData();
-
- // Initialize the monitor data.
- initializePendingMonitorData();
-
- // Send the monitor requests to the connected replication servers.
- for (ReplicationServerHandler rs : connectedRSs.values())
- {
- // Add server ID to pending table.
- int serverId = rs.getServerId();
-
- MonitorRequestMsg msg = new MonitorRequestMsg(
- this.localReplicationServer.getServerId(), serverId);
- try
- {
- rs.send(msg);
-
- // Only register this server ID if we were able to send the
- // message.
- pendingMonitorDataServerIDs.add(serverId);
- }
- catch (IOException e)
- {
- // Log a message and do a best effort from here.
- Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
- .get(baseDn, serverId, e.getMessage());
- logError(message);
- }
- }
-
- // Create the pending response latch based on the number of expected
- // monitor responses.
- pendingMonitorDataLatch = new CountDownLatch(
- pendingMonitorDataServerIDs.size());
- }
-
- // Wait for the responses to come back.
- pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
-
- // Log messages for replication servers that have gone or come back.
- synchronized (pendingMonitorDataLock)
- {
- // Log servers that have come back.
- for (int serverId : monitorDataLateServers)
- {
- // Ensure that we only log once per server: don't fill the
- // error log with repeated messages.
- if (!pendingMonitorDataServerIDs.contains(serverId))
- {
- logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
- serverId));
- }
- }
-
- // Log servers that have gone away.
- for (int serverId : pendingMonitorDataServerIDs)
- {
- // Ensure that we only log once per server: don't fill the
- // error log with repeated messages.
- if (!monitorDataLateServers.contains(serverId))
- {
- logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
- serverId));
- }
- }
-
- // Remember which servers were late this time.
- monitorDataLateServers.clear();
- monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
- }
-
- // Store the new computed data as the reference
- synchronized (pendingMonitorDataLock)
- {
- // Now we have the expected answers or an error occurred
- pendingMonitorData.completeComputing();
- monitorData = pendingMonitorData;
- monitorDataLastBuildDate = TimeThread.getTime();
- }
- }
- finally
- {
- synchronized (pendingMonitorDataLock)
- {
- // Clear pending state.
- pendingMonitorData = null;
- pendingMonitorDataLatch = null;
- pendingMonitorDataServerIDs.clear();
- }
- }
- }
- }
-
- return monitorData;
- }
-
-
-
- /**
- * Start collecting global monitoring information for this
- * ReplicationServerDomain.
- */
-
- private void initializePendingMonitorData()
- {
- // Let's process our directly connected DS
- // - in the ServerHandler for a given DS1, the stored state contains :
- // - the max CN produced by DS1
- // - the last CN consumed by DS1 from DS2..n
- // - in the RSdomain/dbHandler, the built-in state contains :
- // - the max CN produced by each server
- // So for a given DS connected we can take the state and the max from
- // the DS/state.
-
- for (ServerHandler ds : connectedDSs.values())
- {
- int serverID = ds.getServerId();
-
- // the state comes from the state stored in the SH
- ServerState dsState = ds.getServerState().duplicate();
-
- // the max CN sent by that LS also comes from the SH
- ChangeNumber maxcn = dsState.getChangeNumber(serverID);
- if (maxcn == null)
- {
- // This directly connected LS has never produced any change
- maxcn = new ChangeNumber(0, 0, serverID);
- }
- pendingMonitorData.setMaxCN(serverID, maxcn);
- pendingMonitorData.setLDAPServerState(serverID, dsState);
- pendingMonitorData.setFirstMissingDate(serverID,
- ds.getApproxFirstMissingDate());
- }
-
- // Then initialize the max CN for the LS that produced something
- // - from our own local db state
- // - whatever they are directly or indirectly connected
- ServerState dbServerState = getDbServerState();
- pendingMonitorData.setRSState(localReplicationServer.getServerId(),
- dbServerState);
- for (int serverId : dbServerState) {
- ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
- pendingMonitorData.setMaxCN(serverId, storedCN);
- }
- }
-
-
-
- /**
- * Processes a Monitor message receives from a remote Replication Server and
- * stores the data received.
- *
- * @param msg
- * The message to be processed.
- * @param serverId
- * server handler that is receiving the message.
- */
- private void receivesMonitorDataResponse(MonitorMsg msg, int serverId)
- {
- synchronized (pendingMonitorDataLock)
- {
- if (pendingMonitorData == null)
- {
- // This is a response for an earlier request whose computing is
- // already complete.
- logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
- msg.getSenderID()));
- return;
- }
-
- try
- {
- // Here is the RS state : list <serverID, lastChangeNumber>
- // For each LDAP Server, we keep the max CN across the RSes
- ServerState replServerState = msg.getReplServerDbState();
- pendingMonitorData.setMaxCNs(replServerState);
-
- // store the remote RS states.
- pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
-
- // Store the remote LDAP servers states
- Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
- while (dsServerIdIterator.hasNext())
- {
- int dsServerId = dsServerIdIterator.next();
- ServerState dsServerState = msg.getLDAPServerState(dsServerId);
- pendingMonitorData.setMaxCNs(dsServerState);
- pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
- pendingMonitorData.setFirstMissingDate(dsServerId,
- msg.getLDAPApproxFirstMissingDate(dsServerId));
- }
-
- // Process the latency reported by the remote RSi on its connections
- // to the other RSes
- Iterator<Integer> rsServerIdIterator = msg.rsIterator();
- while (rsServerIdIterator.hasNext())
- {
- int rsServerId = rsServerIdIterator.next();
- long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
- if (rsServerId == localReplicationServer.getServerId())
- {
- // this is the latency of the remote RSi regarding the current RS
- // let's update the fmd of my connected LS
- for (DataServerHandler connectedDS : connectedDSs.values())
- {
- int connectedServerId = connectedDS.getServerId();
- pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
- }
- }
- else
- {
- // this is the latency of the remote RSi regarding another RSj
- // let's update the latency of the LSes connected to RSj
- ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
- if (rsjHdr != null)
- {
- for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
- {
- pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
- }
- }
- }
- }
- }
- catch (RuntimeException e)
- {
- // FIXME: do we really expect these???
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
- e.getMessage() + stackTraceToSingleLineString(e)));
- }
- finally
- {
- // Decreases the number of expected responses and potentially
- // wakes up the waiting requester thread.
- if (pendingMonitorDataServerIDs.remove(serverId))
- {
- pendingMonitorDataLatch.countDown();
- }
- }
- }
+ return domainMonitor.getMonitorData();
}
/**
@@ -2791,7 +2441,7 @@
*/
public Map<Integer, DataServerHandler> getConnectedDSs()
{
- return connectedDSs;
+ return Collections.unmodifiableMap(connectedDSs);
}
/**
@@ -2800,7 +2450,7 @@
*/
public Map<Integer, ReplicationServerHandler> getConnectedRSs()
{
- return connectedRSs;
+ return Collections.unmodifiableMap(connectedRSs);
}
@@ -2960,19 +2610,13 @@
String.valueOf(localReplicationServer.getServerId())));
attributes.add(Attributes.create("replication-server-port",
String.valueOf(localReplicationServer.getReplicationPort())));
-
- // Add all the base DNs that are known by this replication server.
attributes.add(Attributes.create("domain-name", baseDn));
-
- // Publish to monitor the generation ID by replicationServerDomain
attributes.add(Attributes.create("generation-id",
baseDn + " " + generationId));
- MonitorData md = getDomainMonitorData();
-
// Missing changes
- long missingChanges = md.getMissingChangesRS(localReplicationServer
- .getServerId());
+ long missingChanges = getDomainMonitorData().getMissingChangesRS(
+ localReplicationServer.getServerId());
attributes.add(Attributes.create("missing-changes",
String.valueOf(missingChanges)));
@@ -3439,4 +3083,10 @@
// connection): store handler.
connectedRSs.put(rsHandler.getServerId(), rsHandler);
}
+
+ private String getMessage(String message)
+ {
+ return "In RS " + localReplicationServer.getServerId() + " for " + baseDn
+ + ": " + message;
+ }
}
--
Gitblit v1.10.0