From 40e2acfd1e9676f3b63385b15075bf1395d4543e Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 01 Feb 2008 13:21:19 +0000
Subject: [PATCH] Fix 2598 - fixes for global replication monitoring
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 294 ++++++++++++++++++++++++++++++++++++++--------------------
1 files changed, 193 insertions(+), 101 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 3b85cb1..0e759c0 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -124,12 +125,12 @@
/**
- * When this Handler is connected to a remote replication server
+ * When this Handler is related to a remote replication server
* this collection will contain as many elements as there are
* LDAP servers connected to the remote replication server.
*/
- private List<LightweightServerHandler>
- remoteLDAPservers = new ArrayList<LightweightServerHandler>();
+ private Map<Short, LightweightServerHandler> connectedServers =
+ new ConcurrentHashMap<Short, LightweightServerHandler>();
/**
* The time in milliseconds between heartbeats from the replication
@@ -200,6 +201,8 @@
maxRcvWindow = windowSize;
rcvWindow = windowSize;
long localGenerationId = -1;
+ boolean handshakeOnly = false;
+
try
{
if (baseDn != null)
@@ -244,6 +247,8 @@
maxSendQueue = receivedMsg.getMaxSendQueue();
heartbeatInterval = receivedMsg.getHeartbeatInterval();
+ handshakeOnly = receivedMsg.isHandshakeOnly();
+
// The session initiator decides whether to use SSL.
sslEncryption = receivedMsg.getSSLEncryption();
@@ -524,60 +529,70 @@
replicationServerDomain = replicationServer.
getReplicationServerDomain(this.baseDn,true);
- boolean started;
- if (serverIsLDAPserver)
+ if (!handshakeOnly)
{
- started = replicationServerDomain.startServer(this);
- }
- else
- {
- started = replicationServerDomain.startReplicationServer(this);
- }
-
- if (started)
- {
- // sendWindow MUST be created before starting the writer
- sendWindow = new Semaphore(sendWindowSize);
-
- writer = new ServerWriter(session, serverId,
- this, replicationServerDomain);
- reader = new ServerReader(session, serverId,
- this, replicationServerDomain);
-
- reader.start();
- writer.start();
-
- // Create a thread to send heartbeat messages.
- if (heartbeatInterval > 0)
+ boolean started;
+ if (serverIsLDAPserver)
{
- heartbeatThread = new HeartbeatThread(
- "replication Heartbeat to " + serverURL +
- " for " + this.baseDn,
- session, heartbeatInterval/3);
- heartbeatThread.start();
+ started = replicationServerDomain.startServer(this);
+ }
+ else
+ {
+ started = replicationServerDomain.startReplicationServer(this);
}
-
- DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
- DirectoryServer.registerMonitorProvider(this);
- }
- else
- {
- // the connection is not valid, close it.
- try
+ if (started)
{
- if (debugEnabled())
+ // sendWindow MUST be created before starting the writer
+ sendWindow = new Semaphore(sendWindowSize);
+
+ writer = new ServerWriter(session, serverId,
+ this, replicationServerDomain);
+ reader = new ServerReader(session, serverId,
+ this, replicationServerDomain);
+
+ reader.start();
+ writer.start();
+
+ // Create a thread to send heartbeat messages.
+ if (heartbeatInterval > 0)
{
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + " RS failed to start locally " +
- " the connection from serverID="+serverId);
+ heartbeatThread = new HeartbeatThread(
+ "replication Heartbeat to " + serverURL +
+ " for " + this.baseDn,
+ session, heartbeatInterval/3);
+ heartbeatThread.start();
}
- session.close();
- } catch (IOException e1)
- {
- // ignore
+
+ DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+ DirectoryServer.registerMonitorProvider(this);
}
+ else
+ {
+ // the connection is not valid, close it.
+ try
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + " RS failed to start locally " +
+ " the connection from serverID="+serverId);
+ }
+ session.close();
+ } catch (IOException e1)
+ {
+ // ignore
+ }
+ }
+ }
+ else
+ {
+ // For a hanshakeOnly connection, let's only create a reader
+ // in order to detect the connection closure.
+ reader = new ServerReader(session, serverId,
+ this, replicationServerDomain);
+ reader.start();
}
}
catch (Exception e)
@@ -842,22 +857,22 @@
/**
* Get the age of the older change that has not yet been replicated
* to the server handled by this ServerHandler.
- *
* @return The age if the older change has not yet been replicated
* to the server handled by this ServerHandler.
*/
public Long getApproxFirstMissingDate()
{
- // Get the older CN received
- // From it, get the next sequence number
- // Get the CN for the next sequence number
- // If not present in the local RS db,
- // then approximate with the older update time
- ChangeNumber olderUpdateCN = getOlderUpdateCN();
- if (olderUpdateCN == null)
- return null;
+ Long result = (long)0;
- return olderUpdateCN.getTime();
+ // Get the older CN received
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN != null)
+ {
+ // If not present in the local RS db,
+ // then approximate with the older update time
+ result=olderUpdateCN.getTime();
+ }
+ return result;
}
/**
@@ -874,29 +889,82 @@
/**
* Get the older Change Number for that server.
+ * Returns null when the queue is empty.
* @return The older change number.
*/
public ChangeNumber getOlderUpdateCN()
{
+ ChangeNumber result = null;
synchronized (msgQueue)
{
if (isFollowing())
{
if (msgQueue.isEmpty())
- return null;
-
- UpdateMessage msg = msgQueue.first();
- return msg.getChangeNumber();
+ {
+ result=null;
+ }
+ else
+ {
+ UpdateMessage msg = msgQueue.first();
+ result = msg.getChangeNumber();
+ }
}
else
{
if (lateQueue.isEmpty())
- return null;
+ {
+ // isFollowing is false AND lateQueue is empty
+ // We may be at the very moment when the writer has emptyed the
+ // lateQueue when it sent the last update. The writer will fill again
+ // the lateQueue when it will send the next update but we are not yet
+ // there. So let's take the last change not sent directly from
+ // the db.
- UpdateMessage msg = lateQueue.first();
- return msg.getChangeNumber();
+ ReplicationIteratorComparator comparator =
+ new ReplicationIteratorComparator();
+ SortedSet<ReplicationIterator> iteratorSortedSet =
+ new TreeSet<ReplicationIterator>(comparator);
+ try
+ {
+ // Build a list of candidates iterator (i.e. db i.e. server)
+ for (short serverId : replicationServerDomain.getServers())
+ {
+ // get the last already sent CN from that server
+ ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
+ // get an iterator in this server db from that last change
+ ReplicationIterator iterator =
+ replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+ // if that iterator has changes, then it is a candidate
+ // it is added in the sorted list at a position given by its
+ // current change (see ReplicationIteratorComparator).
+ if ((iterator != null) && (iterator.getChange() != null))
+ {
+ iteratorSortedSet.add(iterator);
+ }
+ }
+ UpdateMessage msg = iteratorSortedSet.first().getChange();
+ result = msg.getChangeNumber();
+ }
+ catch(Exception e)
+ {
+ result=null;
+ }
+ finally
+ {
+ for (ReplicationIterator iterator : iteratorSortedSet)
+ {
+ iterator.releaseCursor();
+ }
+ }
+ }
+ else
+ {
+ UpdateMessage msg = lateQueue.first();
+ result = msg.getChangeNumber();
+ }
}
}
+ return result;
}
/**
@@ -958,7 +1026,7 @@
*/
while (msgQueue.size() > maxQueueSize)
{
- following = false;
+ setFollowing(false);
msgQueue.removeFirst();
}
}
@@ -1083,6 +1151,13 @@
}
}
}
+
+ // The loop below relies on the fact that it is sorted based
+ // on the currentChange of each iterator to consider the next
+ // change accross all servers.
+ // Hence it is necessary to remove and eventual add again an iterator
+ // when looping in order to keep consistent the order of the
+ // iterators (see ReplicationIteratorComparator.
while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
{
ReplicationIterator iterator = iteratorSortedSet.first();
@@ -1107,7 +1182,7 @@
{
if (msgQueue.size() < maxQueueSize)
{
- following = true;
+ setFollowing(true);
}
}
}
@@ -1119,7 +1194,7 @@
if (msgQueue.contains(msg))
{
/* we finally catched up with the regular queue */
- following = true;
+ setFollowing(true);
lateQueue.clear();
UpdateMessage msg1;
do
@@ -1459,14 +1534,6 @@
attributes.add(new Attribute("connected-to", this.replicationServerDomain.
getReplicationServer().getMonitorInstanceName()));
- // Add the oldest missing update
- Long olderUpdateTime = this.getApproxFirstMissingDate();
- if (olderUpdateTime != null)
- {
- Date date = new Date(olderUpdateTime);
- attributes.add(new Attribute("approx-older-change-not-synchronized",
- date.toString()));
- }
}
else
{
@@ -1477,27 +1544,42 @@
attributes.add(new Attribute("base-dn",
baseDn.toString()));
- // Update stats
-
- // Retrieves the topology counters
if (serverIsLDAPserver)
{
+ MonitorData md;
try
{
- replicationServerDomain.retrievesRemoteMonitorData();
+ md = replicationServerDomain.getMonitorData();
+
+ // Oldest missing update
+ Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+ if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
+ {
+ Date date = new Date(approxFirstMissingDate);
+ attributes.add(new Attribute("approx-older-change-not-synchronized",
+ date.toString()));
+ attributes.add(
+ new Attribute("approx-older-change-not-synchronized-millis",
+ String.valueOf(approxFirstMissingDate)));
+ }
+
+ // Missing changes
+ long missingChanges = md.getMissingChanges(serverId);
+ attributes.add(new Attribute("missing-changes",
+ String.valueOf(missingChanges)));
+
+ // Replication delay
+ long delay = md.getApproxDelay(serverId);
+ attributes.add(new Attribute("approximate-delay",
+ String.valueOf(delay)));
}
catch(Exception e)
{
- // FIXME: We failed retrieving the remote monitor data
+ // TODO: improve the log
+ // We failed retrieving the remote monitor data.
+ attributes.add(new Attribute("error",
+ stackTraceToSingleLineString(e)));
}
-
- // Compute the latency for the current SH
- int missingChanges =
- replicationServerDomain.getMissingChanges(serverState);
-
- // add the latency attribute to our monitor data
- attributes.add(new Attribute("missing-changes",
- String.valueOf(missingChanges)));
}
// Deprecated
@@ -1532,8 +1614,6 @@
attributes.add(new Attribute("waiting-changes",
String.valueOf(getRcvMsgQueueSize())));
// Age of oldest missing change
- attributes.add(new Attribute("approximate-delay",
- String.valueOf(getApproxDelay())));
// Date of the oldest missing change
long olderUpdateTime = getOlderUpdateTime();
@@ -1731,14 +1811,14 @@
List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
generationId = infoMsg.getGenerationId();
- synchronized(remoteLDAPservers)
+ synchronized(connectedServers)
{
// Removes the existing structures
- for (LightweightServerHandler lsh : remoteLDAPservers)
+ for (LightweightServerHandler lsh : connectedServers.values())
{
lsh.stopHandler();
}
- remoteLDAPservers.clear();
+ connectedServers.clear();
// Creates the new structure according to the message received.
for (String newConnectedServer : newRemoteLDAPservers)
@@ -1746,7 +1826,7 @@
LightweightServerHandler lsh
= new LightweightServerHandler(newConnectedServer, this);
lsh.startHandler();
- remoteLDAPservers.add(lsh);
+ connectedServers.put(lsh.getServerId(), lsh);
}
}
}
@@ -1762,14 +1842,17 @@
*/
public boolean isRemoteLDAPServer(short wantedServer)
{
- for (LightweightServerHandler server : remoteLDAPservers)
+ synchronized(connectedServers)
{
- if (wantedServer == server.getServerId())
+ for (LightweightServerHandler server : connectedServers.values())
{
- return true;
+ if (wantedServer == server.getServerId())
+ {
+ return true;
+ }
}
+ return false;
}
- return false;
}
/**
@@ -1781,7 +1864,7 @@
*/
public boolean hasRemoteLDAPServers()
{
- return !remoteLDAPservers.isEmpty();
+ return !connectedServers.isEmpty();
}
/**
@@ -1907,4 +1990,13 @@
{
return this.replicationServerDomain;
}
+
+ /**
+ * Return a Set containing the servers known by this replicationServer.
+ * @return a set containing the servers known by this replicationServer.
+ */
+ public Set<Short> getConnectedServerIds()
+ {
+ return connectedServers.keySet();
+ }
}
--
Gitblit v1.10.0