From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS, we introduce:
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 378 ++++++++++++++++++++++++++++++++++++-----------------
1 files changed, 255 insertions(+), 123 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4ef072c..ab1da75 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
+import org.opends.server.replication.server.
+ ReplicationServer.GlobalServerId;
/**
* This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
// late or not
private StatusAnalyzer statusAnalyzer = null;
+ // The monitoring publisher that periodically sends monitoring messages to the
+ // topology
+ private MonitoringPublisher monitoringPublisher = null;
+
/*
* The following map contains one balanced tree for each replica ID
* to which we are currently publishing
@@ -1066,6 +1072,17 @@
// Try doing job anyway...
}
+ // Stop useless monitoring publisher if no more RS or DS in domain
+ if ( (directoryServers.size() + replicationServers.size() )== 1)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " +
+ replicationServer.getMonitorInstanceName() +
+ " remote server " + handler.getMonitorInstanceName() + " is " +
+ "the last RS/DS to be stopped: stopping monitoring publisher");
+ stopMonitoringPublisher();
+ }
+
if (handler.isReplicationServer())
{
if (replicationServers.containsValue(handler))
@@ -1082,44 +1099,39 @@
buildAndSendTopoInfoToDSs(null);
}
}
- } else
+ } else if (directoryServers.containsValue(handler))
{
- if (directoryServers.containsValue(handler))
+ // If this is the last DS for the domain,
+ // shutdown the status analyzer
+ if (directoryServers.size() == 1)
{
- // If this is the last DS for the domain,
- // shutdown the status analyzer
- if (directoryServers.size() == 1)
- {
- if (debugEnabled())
- TRACER.debugInfo("In " +
- replicationServer.getMonitorInstanceName() +
- " remote server " + handler.getMonitorInstanceName() +
+ if (debugEnabled())
+ TRACER.debugInfo("In " +
+ replicationServer.getMonitorInstanceName() +
+ " remote server " + handler.getMonitorInstanceName() +
" is the last DS to be stopped: stopping status analyzer");
- stopStatusAnalyzer();
- }
+ stopStatusAnalyzer();
+ }
- unregisterServerHandler(handler);
- handler.shutdown();
+ unregisterServerHandler(handler);
+ handler.shutdown();
- // Check if generation id has to be reset
- mayResetGenerationId();
+ // Check if generation id has to be reset
+ mayResetGenerationId();
+ if (!shutdown)
+ {
// Update the remote replication servers with our list
// of connected LDAP servers
- if (!shutdown)
- {
- buildAndSendTopoInfoToRSs();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
- }
+ buildAndSendTopoInfoToRSs();
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
}
- else if (otherHandlers.contains(handler))
- {
- unRegisterHandler(handler);
- handler.shutdown();
- }
+ } else if (otherHandlers.contains(handler))
+ {
+ unRegisterHandler(handler);
+ handler.shutdown();
}
-
}
catch(Exception e)
{
@@ -1581,99 +1593,51 @@
// in the topology.
if (senderHandler.isDataServer())
{
- MonitorMsg returnMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID());
+ // Monitoring information requested by a DS
+ MonitorMsg monitorMsg =
+ createGlobalTopologyMonitorMsg(msg.getDestination(),
+ msg.getsenderID());
- try
+ if (monitorMsg != null)
{
- returnMsg.setReplServerDbState(getDbServerState());
- // Update the information we have about all servers
- // in the topology.
- MonitorData md = computeMonitorData();
-
- // Add the informations about the Replicas currently in
- // the topology.
- Iterator<Integer> it = md.ldapIterator();
- while (it.hasNext())
+ try
{
- int replicaId = it.next();
- returnMsg.setServerState(
- replicaId, md.getLDAPServerState(replicaId),
- md.getApproxFirstMissingDate(replicaId), true);
- }
-
- // Add the informations about the Replication Servers
- // currently in the topology.
- it = md.rsIterator();
- while (it.hasNext())
+ senderHandler.send(monitorMsg);
+ } catch (IOException e)
{
- int replicaId = it.next();
- returnMsg.setServerState(
- replicaId, md.getRSStates(replicaId),
- md.getRSApproxFirstMissingDate(replicaId), false);
+ // the connection was closed.
}
}
- catch (DirectoryException e)
- {
- // If we can't compute the Monitor Information, send
- // back an empty message.
- }
- try
- {
- senderHandler.send(returnMsg);
- } catch (IOException e)
- {
- // the connection was closed.
- }
return;
- }
-
- MonitorMsg monitorMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID());
-
- // Populate for each connected LDAP Server
- // from the states stored in the serverHandler.
- // - the server state
- // - the older missing change
- for (DataServerHandler lsh : this.directoryServers.values())
+ } else
{
- monitorMsg.setServerState(
- lsh.getServerId(),
- lsh.getServerState(),
- lsh.getApproxFirstMissingDate(),
- true);
- }
+ // Monitoring information requested by a RS
+ MonitorMsg monitorMsg =
+ createLocalTopologyMonitorMsg(msg.getDestination(),
+ msg.getsenderID());
- // Same for the connected RS
- for (ReplicationServerHandler rsh : this.replicationServers.values())
- {
- monitorMsg.setServerState(
- rsh.getServerId(),
- rsh.getServerState(),
- rsh.getApproxFirstMissingDate(),
- false);
- }
-
- // Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerDbState(this.getDbServerState());
-
-
- try
- {
- senderHandler.send(monitorMsg);
- } catch (Exception e)
- {
- // We log the error. The requestor will detect a timeout or
- // any other failure on the connection.
- logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
- Integer.toString((msg.getDestination()))));
+ if (monitorMsg != null)
+ {
+ try
+ {
+ senderHandler.send(monitorMsg);
+ } catch (Exception e)
+ {
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+ Integer.toString((msg.getDestination()))));
+ }
+ }
}
} else if (msg instanceof MonitorMsg)
{
MonitorMsg monitorMsg =
(MonitorMsg) msg;
- receivesMonitorDataResponse(monitorMsg);
+ GlobalServerId globalServerId =
+ new GlobalServerId(baseDn, senderHandler.getServerId());
+ receivesMonitorDataResponse(monitorMsg, globalServerId);
} else
{
logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
}
/**
+ * Creates a new monitor message including monitoring information for the
+ * whole topology.
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ * @return The newly created and filled MonitorMsg. Null if a problem occurred
+ * during message creation.
+ */
+ public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
+ {
+ MonitorMsg returnMsg =
+ new MonitorMsg(sender, destination);
+
+ try
+ {
+ returnMsg.setReplServerDbState(getDbServerState());
+ // Update the information we have about all servers
+ // in the topology.
+ MonitorData md = computeMonitorData();
+
+ // Add the informations about the Replicas currently in
+ // the topology.
+ Iterator<Integer> it = md.ldapIterator();
+ while (it.hasNext())
+ {
+ int replicaId = it.next();
+ returnMsg.setServerState(
+ replicaId, md.getLDAPServerState(replicaId),
+ md.getApproxFirstMissingDate(replicaId), true);
+ }
+
+ // Add the informations about the Replication Servers
+ // currently in the topology.
+ it = md.rsIterator();
+ while (it.hasNext())
+ {
+ int replicaId = it.next();
+ returnMsg.setServerState(
+ replicaId, md.getRSStates(replicaId),
+ md.getRSApproxFirstMissingDate(replicaId), false);
+ }
+ }
+ catch (DirectoryException e)
+ {
+ // If we can't compute the Monitor Information, send
+ // back an empty message.
+ }
+ return returnMsg;
+ }
+
+ /**
+ * Creates a new monitor message including monitoring information for the
+ * topology directly connected to this RS. This includes information for:
+ * - local RS
+ * - all direct DSs
+ * - all direct RSs
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ * @return The newly created and filled MonitorMsg. Null if a problem occurred
+ * during message creation.
+ */
+ public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+ {
+ MonitorMsg monitorMsg = null;
+
+ try {
+
+ // Lock domain as we need to go through connected servers list
+ lock();
+
+ monitorMsg = new MonitorMsg(sender, destination);
+
+
+ // Populate for each connected LDAP Server
+ // from the states stored in the serverHandler.
+ // - the server state
+ // - the older missing change
+ for (DataServerHandler lsh : this.directoryServers.values())
+ {
+ monitorMsg.setServerState(
+ lsh.getServerId(),
+ lsh.getServerState(),
+ lsh.getApproxFirstMissingDate(),
+ true);
+ }
+
+ // Same for the connected RS
+ for (ReplicationServerHandler rsh : this.replicationServers.values())
+ {
+ monitorMsg.setServerState(
+ rsh.getServerId(),
+ rsh.getServerState(),
+ rsh.getApproxFirstMissingDate(),
+ false);
+ }
+
+ // Populate the RS state in the msg from the DbState
+ monitorMsg.setReplServerDbState(this.getDbServerState());
+ } catch(InterruptedException e)
+ {
+ // At lock, too bad...
+ } finally
+ {
+ if (hasLock())
+ release();
+ }
+
+ return monitorMsg;
+ }
+
+ /**
* Shutdown this ReplicationServerDomain.
*/
public void shutdown()
@@ -1831,8 +1905,7 @@
/**
* Send a TopologyMsg to all the connected directory servers in order to
- * let.
- * them know the topology (every known DSs and RSs)
+ * let them know the topology (every known DSs and RSs).
* @param notThisOne If not null, the topology message will not be sent to
* this passed server.
*/
@@ -1931,10 +2004,11 @@
dsInfos.add(serverHandler.toDSInfo());
}
- // Create info for us (local RS)
+ // Create info for the local RS
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- generationId, replicationServer.getGroupId());
+ generationId, replicationServer.getGroupId(),
+ replicationServer.getWeight());
rsInfos.add(localRSInfo);
return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
// Add our own info (local RS)
RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- generationId, replicationServer.getGroupId());
+ generationId, replicationServer.getGroupId(),
+ replicationServer.getWeight());
rsInfos.add(localRSInfo);
// Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
* Start collecting global monitoring information for this
* ReplicationServerDomain.
*
- * @return The number of response that should come back.
+ * @param expectedMonitoringMsg The list of server handler we have to wait a
+ * monitoring message from. Will be filled as necessary by this method.
*
* @throws DirectoryException In case the monitoring information could
* not be collected.
*/
- int initializeMonitorData() throws DirectoryException
+ void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
+ throws DirectoryException
{
synchronized (monitorDataLock)
{
@@ -2539,7 +2616,7 @@
}
// Send the request for remote monitor data to the
- return sendMonitorDataRequest();
+ sendMonitorDataRequest(expectedMonitoringMsg);
}
/**
@@ -2566,22 +2643,25 @@
/**
* Sends a MonitorRequest message to all connected RS.
- * @return the number of requests sent.
+ * @param expectedMonitoringMsg The list of server handler we have to wait a
+ * monitoring message from. Will be filled as necessary by this method.
* @throws DirectoryException when a problem occurs.
*/
- protected int sendMonitorDataRequest()
+ protected void sendMonitorDataRequest(
+ List<GlobalServerId> expectedMonitoringMsg)
throws DirectoryException
{
- int sent = 0;
try
{
for (ServerHandler rs : replicationServers.values())
{
+ int serverId = rs.getServerId();
MonitorRequestMsg msg =
new MonitorRequestMsg(this.replicationServer.getServerId(),
- rs.getServerId());
+ serverId);
rs.send(msg);
- sent++;
+ // Store the fact that we expect a MonitoringMsg back from this server
+ expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
}
} catch (Exception e)
{
@@ -2590,7 +2670,6 @@
throw new DirectoryException(ResultCode.OTHER,
message, e);
}
- return sent;
}
/**
@@ -2598,8 +2677,10 @@
* and stores the data received.
*
* @param msg The message to be processed.
+ * @param globalServerHandlerId server handler that is receiving the message.
*/
- public void receivesMonitorDataResponse(MonitorMsg msg)
+ private void receivesMonitorDataResponse(MonitorMsg msg,
+ GlobalServerId globalServerId)
{
try
{
@@ -2677,7 +2758,7 @@
// Decreases the number of expected responses and potentially
// wakes up the waiting requestor thread.
- replicationServer.responseReceived();
+ replicationServer.responseReceived(globalServerId);
} catch (Exception e)
{
@@ -2832,6 +2913,57 @@
}
/**
+ * Starts the monitoring publisher for the domain.
+ */
+ public void startMonitoringPublisher()
+ {
+ if (monitoringPublisher == null)
+ {
+ long period =
+ replicationServer.getMonitoringPublisherPeriod();
+ if (period > 0) // 0 means no monitoring publisher
+ {
+ monitoringPublisher = new MonitoringPublisher(this, period);
+ monitoringPublisher.start();
+ }
+ }
+ }
+
+ /**
+ * Stops the monitoring publisher for the domain.
+ */
+ public void stopMonitoringPublisher()
+ {
+ if (monitoringPublisher != null)
+ {
+ monitoringPublisher.shutdown();
+ monitoringPublisher.waitForShutdown();
+ monitoringPublisher = null;
+ }
+ }
+
+ /**
+ * Tests if the monitoring publisher for this domain is running.
+ * @return True if the monitoring publisher is running, false otherwise.
+ */
+ public boolean isRunningMonitoringPublisher()
+ {
+ return (monitoringPublisher != null);
+ }
+
+ /**
+ * Update the monitoring publisher with the new period value.
+ * @param period The new period value.
+ */
+ public void updateMonitoringPublisher(long period)
+ {
+ if (monitoringPublisher != null)
+ {
+ monitoringPublisher.setPeriod(period);
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
--
Gitblit v1.10.0