From 4fe72a4bef946169b0f50bc05bd9dc3b4b1131d3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 14 Aug 2009 12:37:19 +0000
Subject: [PATCH] Support for External change log compatible with draft-good-ldap-changelog-04.txt , March 2003
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 81 +++++++++++++++++++++++++++++++++-------
1 files changed, 66 insertions(+), 15 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index e904311..fbc1776 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -116,8 +116,8 @@
// performPhaseOneHandshake method.
private String tmpReadableServerName = null;
/**
- * The time in milliseconds between heartbeats from the replication
- * server. Zero means heartbeats are off.
+ * The expected duration in milliseconds between heartbeats received
+ * from the replication server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
/**
@@ -142,6 +142,16 @@
// Same group id poller thread
private SameGroupIdPoller sameGroupIdPoller = null;
+ /**
+ * The thread that publishes messages to the RS containing the current
+ * change time of this DS.
+ */
+ private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
+ /**
+ * The expected period in milliseconds between these messages are sent
+ * to the replication server. Zero means heartbeats are off.
+ */
+ private long changeTimeHeartbeatSendInterval = 0;
/*
* Properties for the last topology info received from the network.
*/
@@ -159,24 +169,27 @@
*
* @param replicationDomain The replication domain that is creating us.
* @param state The ServerState that should be used by this broker
- * when negotiating the session with the replicationServer.
+ * when negotiating the session with the replicationServer.
* @param baseDn The base DN that should be used by this broker
- * when negotiating the session with the replicationServer.
+ * when negotiating the session with the replicationServer.
* @param serverId The server ID that should be used by this broker
- * when negotiating the session with the replicationServer.
+ * when negotiating the session with the replicationServer.
* @param window The size of the send and receive window to use.
- * @param heartbeatInterval The interval between heartbeats requested of the
- * replicationServer, or zero if no heartbeats are requested.
- *
* @param generationId The generationId for the server associated to the
* provided serverId and for the domain associated to the provided baseDN.
+ * @param heartbeatInterval The interval (in ms) between heartbeats requested
+ * from the replicationServer, or zero if no heartbeats are requested.
* @param replSessionSecurity The session security configuration.
* @param groupId The group id of our domain.
+ * @param changeTimeHeartbeatInterval The interval (in ms) between Change
+ * time heartbeats are sent to the RS,
+ * or zero if no CN heartbeat shoud be sent.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
ServerState state, String baseDn, short serverId, int window,
long generationId, long heartbeatInterval,
- ReplSessionSecurity replSessionSecurity, byte groupId)
+ ReplSessionSecurity replSessionSecurity, byte groupId,
+ long changeTimeHeartbeatInterval)
{
this.domain = replicationDomain;
this.baseDn = baseDn;
@@ -190,6 +203,7 @@
this.maxRcvWindow = window;
this.maxRcvWindow = window;
this.halfRcvWindow = window /2;
+ this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
}
/**
@@ -392,7 +406,8 @@
// Stop any existing poller and heartbeat monitor from a previous session.
stopSameGroupIdPoller();
- stopHeartBeat();
+ stopRSHeartBeatMonitoring();
+ stopChangeTimeHeartBeatPublishing();
boolean newServerWithSameGroupId = false;
synchronized (connectPhaseLock)
@@ -508,7 +523,8 @@
logError(message);
startSameGroupIdPoller();
}
- startHeartBeat();
+ startRSHeartBeatMonitoring();
+ startChangeTimeHeartBeatPublishing();
} else
{
// Detected new RS with our group id: log disconnection to
@@ -1025,7 +1041,7 @@
// Send our Start Session
StartECLSessionMsg startECLSessionMsg = null;
startECLSessionMsg = new StartECLSessionMsg();
- startECLSessionMsg.setOperationId(Short.toString(serverId));
+ startECLSessionMsg.setOperationId("-1");
session.publish(startECLSessionMsg);
/* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
@@ -1428,7 +1444,7 @@
/**
* Start the heartbeat monitor thread.
*/
- private void startHeartBeat()
+ private void startRSHeartBeatMonitoring()
{
// Start a heartbeat monitor thread.
if (heartbeatInterval > 0)
@@ -1467,7 +1483,7 @@
/**
* Stop the heartbeat monitor thread.
*/
- void stopHeartBeat()
+ void stopRSHeartBeatMonitoring()
{
if (heartbeatMonitor != null)
{
@@ -1753,7 +1769,8 @@
+ " domain " + baseDn);
}
stopSameGroupIdPoller();
- stopHeartBeat();
+ stopRSHeartBeatMonitoring();
+ stopChangeTimeHeartBeatPublishing();
replicationServer = "stopped";
shutdown = true;
connected = false;
@@ -2156,4 +2173,38 @@
{
return connectionError;
}
+
+ /**
+ * Starts publishing to the RS the current timestamp used in this server.
+ */
+ public void startChangeTimeHeartBeatPublishing()
+ {
+ // Start a CN heartbeat thread.
+ if (changeTimeHeartbeatSendInterval > 0)
+ {
+ ctHeartbeatPublisherThread =
+ new CTHeartbeatPublisherThread(
+ "Replication CN Heartbeat Thread started for " +
+ baseDn + " with " + getReplicationServer(),
+ session, changeTimeHeartbeatSendInterval, serverId);
+ ctHeartbeatPublisherThread.start();
+ }
+ else
+ {
+ TRACER.debugInfo(this +
+ " is not configured to send CN heartbeat interval");
+ }
+ }
+
+ /**
+ * Stops publishing to the RS the current timestamp used in this server.
+ */
+ public void stopChangeTimeHeartBeatPublishing()
+ {
+ if (ctHeartbeatPublisherThread != null)
+ {
+ ctHeartbeatPublisherThread.shutdown();
+ ctHeartbeatPublisherThread = null;
+ }
+ }
}
--
Gitblit v1.10.0