From 0f942fb22a49820dacbc16bd9769fbb479e0e4f2 Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Fri, 02 Feb 2007 21:50:10 +0000
Subject: [PATCH] These changes are for issue 787: LDAP server need to detect failure of changelog servers
---
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java | 66 ++++++++++++++++++++++++++++-----
1 files changed, 56 insertions(+), 10 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index 5a7d784..5239d75 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -94,6 +94,24 @@
private int timeout = 0;
/**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval = 0;
+
+
+ /**
+ * A thread to monitor heartbeats on the session.
+ */
+ private HeartbeatMonitor heartbeatMonitor = null;
+
+ /**
+ * The number of times the connection was lost.
+ */
+ private int numLostConnections = 0;
+
+
+ /**
* Creates a new Changelog Broker for a particular SynchronizationDomain.
*
* @param state The ServerState that should be used by this broker
@@ -110,10 +128,12 @@
* the changelog server.
* @param maxSendDelay The maximum send delay to use on the changelog server.
* @param window The size of the send and receive window to use.
+ * @param heartbeatInterval The interval between heartbeats requested of the
+ * changelog server, or zero if no heartbeats are requested.
*/
public ChangelogBroker(ServerState state, DN baseDn, short serverID,
int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window)
+ int maxSendDelay, int window, long heartbeatInterval)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -127,6 +147,7 @@
this.rcvWindow = window;
this.maxRcvWindow = window;
this.halfRcvWindow = window/2;
+ this.heartbeatInterval = heartbeatInterval;
}
/**
@@ -157,7 +178,7 @@
/**
- * Connect the Changelog server to other servers.
+ * Connect to a Changelog server.
*
* @throws NumberFormatException address was invalid
* @throws IOException error during connection phase
@@ -166,6 +187,13 @@
{
ChangelogStartMessage startMsg;
+ // Stop any existing heartbeat monitor from a previous session.
+ if (heartbeatMonitor != null)
+ {
+ heartbeatMonitor.shutdown();
+ heartbeatMonitor = null;
+ }
+
boolean checkState = true;
while( !connected)
{
@@ -191,9 +219,9 @@
/*
* Send our ServerStartMessage.
*/
- ServerStartMessage msg = new ServerStartMessage( serverID, baseDn,
+ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow*2, state);
+ halfRcvWindow*2, heartbeatInterval, state);
session.publish(msg);
@@ -369,6 +397,15 @@
}
}
}
+
+ // Start a heartbeat monitor thread.
+ if (heartbeatInterval > 0)
+ {
+ heartbeatMonitor =
+ new HeartbeatMonitor("Synchronization Heartbeat Monitor", session,
+ heartbeatInterval);
+ heartbeatMonitor.start();
+ }
}
@@ -379,6 +416,8 @@
*/
private void reStart(ProtocolSession failingSession)
{
+ numLostConnections++;
+
try
{
failingSession.close();
@@ -445,7 +484,7 @@
/**
* Receive a message.
* @return the received message
- * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+ * @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
*/
public SynchronizationMessage receive() throws SocketTimeoutException
@@ -474,13 +513,11 @@
}
return msg;
}
+ } catch (SocketTimeoutException e)
+ {
+ throw e;
} catch (Exception e)
{
- if (e instanceof SocketTimeoutException)
- {
- SocketTimeoutException e1 = (SocketTimeoutException) e;
- throw e1;
- }
if (shutdown == false)
{
synchronized (lock)
@@ -631,4 +668,13 @@
else
return 0;
}
+
+ /**
+ * Get the number of times the connection was lost.
+ * @return The number of times the connection was lost.
+ */
+ public int getNumLostConnections()
+ {
+ return numLostConnections;
+ }
}
--
Gitblit v1.10.0