From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS, we introduce:
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 201 insertions(+), 35 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index b4403da..71f4386 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -101,6 +99,7 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
+import java.util.Collections;
/**
* ReplicationServer Listener.
@@ -173,6 +172,10 @@
// the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
private int degradedStatusThreshold = 5000;
+ // Number of milliseconds to wait before sending new monitoring messages.
+ // If value is 0, monitoring publisher is disabled
+ private long monitoringPublisherPeriod = 3000;
+
// The handler of the draft change numbers database, the database used to
// store the relation between a draft change number ('seqnum') and the
// associated cookie.
@@ -211,6 +214,13 @@
private int weight = 1;
/**
+ * Holds the list of all replication servers instantiated in this VM.
+ * This allows to perform clean up of the RS databases in unit tests.
+ */
+ private static List<ReplicationServer> allInstances =
+ new ArrayList<ReplicationServer>();
+
+ /**
* Creates a new Replication server using the provided configuration entry.
*
* @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
groupId = (byte)configuration.getGroupId();
assuredTimeout = configuration.getAssuredTimeout();
degradedStatusThreshold = configuration.getDegradedStatusThreshold();
+ monitoringPublisherPeriod = configuration.getMonitoringPeriod();
replSessionSecurity = new ReplSessionSecurity();
initialize(replicationPort);
@@ -274,8 +285,20 @@
DirectoryServer.registerImportTaskListener(this);
localPorts.add(replicationPort);
+
+ // Keep track of this new instance
+ allInstances.add(this);
}
+ /**
+ * Get the list of every replication servers instantiated in the current VM.
+ * @return The list of every replication servers instantiated in the current
+ * VM.
+ */
+ public static List<ReplicationServer> getAllInstances()
+ {
+ return allInstances;
+ }
/**
* The run method for the Listen thread.
@@ -850,7 +873,9 @@
dbEnv.shutdown();
}
-}
+ // Remove this instance from the global instance list
+ allInstances.remove(this);
+ }
/**
@@ -1028,6 +1053,32 @@
}
}
+ // Update period value for monitoring publishers (stop them if requested
+ // value is 0)
+ if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+ {
+ long oldMonitoringPeriod = monitoringPublisherPeriod;
+ monitoringPublisherPeriod = configuration.getMonitoringPeriod();
+ for(ReplicationServerDomain rsd : baseDNs.values())
+ {
+ if (monitoringPublisherPeriod == 0L)
+ {
+ // Requested to stop monitoring publishers
+ rsd.stopMonitoringPublisher();
+ } else if (rsd.isRunningMonitoringPublisher())
+ {
+ // Update the threshold value for this running monitoring publisher
+ rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
+ } else if (oldMonitoringPeriod == 0L)
+ {
+ // Requested to start monitoring publishers with provided period value
+ if ( (rsd.getConnectedDSs().size() > 0) ||
+ (rsd.getConnectedRSs().size() > 0) )
+ rsd.startMonitoringPublisher();
+ }
+ }
+ }
+
// Changed the group id ?
byte newGroupId = (byte)configuration.getGroupId();
if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
if (weight != configuration.getWeight())
{
weight = configuration.getWeight();
- // TODO: send new TopologyMsg
+ // Broadcast the new weight the the whole topology. This will make some
+ // DSs reconnect (if needed) to other RSs according to the new weight of
+ // this RS.
+ broadcastConfigChange();
}
if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
}
/**
+ * Broadcast a configuration change that just happened to the whole topology
+ * by sending a TopologyMsg to every entity in the topology.
+ */
+ private void broadcastConfigChange()
+ {
+ for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+ {
+ replicationServerDomain.buildAndSendTopoInfoToDSs(null);
+ replicationServerDomain.buildAndSendTopoInfoToRSs();
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
}
/**
+ * Get the monitoring publisher period value.
+ * @return the monitoring publisher period value.
+ */
+ public long getMonitoringPublisherPeriod()
+ {
+ return monitoringPublisherPeriod;
+ }
+
+ /**
* Compute the list of replication servers that are not any
* more connected to this Replication Server and stop the
* corresponding handlers.
@@ -1411,12 +1487,80 @@
/* The date of the last time they have been elaborated */
private long monitorDataLastBuildDate = 0;
- /* Search op on monitor data is processed by a worker thread.
- * Requests are sent to the other RS,and responses are received by the
- * listener threads.
- * The worker thread is awoke on this semaphore, or on timeout.
+ /**
+ * This uniquely identifies a server (handler) in the cross-domain topology.
+ * Represents an identifier of a handler (in the whole RS) we have to wait a
+ * monitoring message from before answering to a monitor request.
*/
- Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+ public static class GlobalServerId {
+
+ private int serverId = -1;
+ private String baseDn = null;
+
+ /**
+ * Constructor for a global server id.
+ * @param baseDn The dn of the RSD owning the handler.
+ * @param serverId The handler id in the matching RSD.
+ */
+ public GlobalServerId(String baseDn, int serverId) {
+ this.baseDn = baseDn;
+ this.serverId = serverId;
+ }
+
+ /**
+ * Get the server handler id.
+ * @return the serverId
+ */
+ public int getServerId()
+ {
+ return serverId;
+ }
+
+ /**
+ * Get the base dn.
+ * @return the baseDn
+ */
+ public String getBaseDn()
+ {
+ return baseDn;
+ }
+
+ /**
+ * Get the hascode.
+ * @return The hashcode.
+ */
+ @Override
+ public int hashCode()
+ {
+ int hash = 7;
+ hash = 43 * hash + this.serverId;
+ hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
+ return hash;
+ }
+
+ /**
+ * Tests if the passed global server handler id represents the same server
+ * handler as this one.
+ * @param obj The object to test.
+ * @return True if both identifiers are the same.
+ */
+ public boolean equals(Object obj) {
+ if ( (obj == null) || (obj instanceof GlobalServerId))
+ return false;
+
+ GlobalServerId globalServerId = (GlobalServerId)obj;
+ return ( globalServerId.baseDn.equals(baseDn) &&
+ (globalServerId.serverId == serverId) );
+ }
+ }
+
+ /**
+ * This gives the list of server handlers we are willing to wait monitoring
+ * message from. Each time a monitoring message is received by a server
+ * handler, the matching server handler id is retired from the list. When the
+ * list is empty, we received all expected monitoring messages.
+ */
+ private List<GlobalServerId> expectedMonitoringMsg = null;
/**
* Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
*
* @throws DirectoryException If the computation cannot be achieved.
*/
- public void computeMonitorData() throws DirectoryException
+ public synchronized void computeMonitorData() throws DirectoryException
{
if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
{
@@ -1440,15 +1584,17 @@
return;
}
- remoteMonitorResponsesSemaphore.drainPermits();
- int count = 0;
+ // Initialize the list of server handlers we expect monitoring messages from
+ expectedMonitoringMsg =
+ Collections.synchronizedList(new ArrayList<GlobalServerId>());
+
for (ReplicationServerDomain domain : baseDNs.values())
{
- count += domain.initializeMonitorData();
+ domain.initializeMonitorData(expectedMonitoringMsg);
}
// Wait for responses
- waitMonitorDataResponses(count);
+ waitMonitorDataResponses();
for (ReplicationServerDomain domain : baseDNs.values())
{
@@ -1457,38 +1603,51 @@
}
/**
- * Wait for the expected count of received MonitorMsg.
- * @param expectedResponses The number of expected answers.
+ * Wait for the expected received MonitorMsg.
* @throws DirectoryException When an error occurs.
*/
- private void waitMonitorDataResponses(int expectedResponses)
+ private void waitMonitorDataResponses()
throws DirectoryException
{
try
{
if (debugEnabled())
TRACER.debugInfo(
- "In " + getMonitorInstanceName() + " baseDn=" +
- " waiting for " + expectedResponses + " expected monitor messages");
+ "In " + getMonitorInstanceName() +
+ " waiting for " + expectedMonitoringMsg.size() +
+ " expected monitor messages");
- boolean allPermitsAcquired =
- remoteMonitorResponsesSemaphore.tryAcquire(
- expectedResponses,
- (long) 5000, TimeUnit.MILLISECONDS);
-
- if (!allPermitsAcquired)
+ // Wait up to 5 seconds for every expected monitoring message to come
+ // back.
+ boolean allReceived = false;
+ long startTime = TimeThread.getTime();
+ long curTime = startTime;
+ int maxTime = 5000;
+ while ( (curTime - startTime) < maxTime )
{
- monitorDataLastBuildDate = TimeThread.getTime();
+ // Have every expected monitoring messages arrived ?
+ if (expectedMonitoringMsg.size() == 0)
+ {
+ // Ok break the loop
+ allReceived = true;
+ break;
+ }
+ Thread.sleep(100);
+ curTime = TimeThread.getTime();
+ }
+
+ monitorDataLastBuildDate = TimeThread.getTime();
+
+ if (!allReceived)
+ {
logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
- // let's go on in best effort even with limited data received.
+ // let's go on in best effort even with limited data received.
} else
{
- monitorDataLastBuildDate = TimeThread.getTime();
if (debugEnabled())
TRACER.debugInfo(
- "In " + getMonitorInstanceName() + " baseDn=" +
- " Successfully received all " + expectedResponses +
- " expected monitor messages");
+ "In " + getMonitorInstanceName() +
+ " Successfully received all expected monitor messages");
}
} catch (Exception e)
{
@@ -1499,11 +1658,18 @@
/**
* This should be called by each ReplicationServerDomain that receives
- * a response to a monitor request message.
+ * a response to a monitor request message. This may also be called when a
+ * monitoring message is coming from a RS whose monitoring publisher thread
+ * sent it. As monitoring messages (sent because of monitoring request or
+ * because of monitoring publisher) have the same content, this is also ok
+ * to mark ok the server when the monitoring message coms from a monitoring
+ * publisher thread.
+ * @param globalServerId The server handler that is receiving the
+ * monitoring message.
*/
- public void responseReceived()
+ public void responseReceived(GlobalServerId globalServerId)
{
- remoteMonitorResponsesSemaphore.release();
+ expectedMonitoringMsg.remove(globalServerId);
}
@@ -1513,7 +1679,7 @@
*/
public void responseReceivedAll()
{
- remoteMonitorResponsesSemaphore.notifyAll();
+ expectedMonitoringMsg.clear();
}
/**
--
Gitblit v1.10.0