From 4fe72a4bef946169b0f50bc05bd9dc3b4b1131d3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 14 Aug 2009 12:37:19 +0000
Subject: [PATCH] Support for External change log compatible with draft-good-ldap-changelog-04.txt , March 2003

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |   81 +++++++++++++++++++++++++++++++++-------
 1 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index e904311..fbc1776 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -116,8 +116,8 @@
   // performPhaseOneHandshake method.
   private String tmpReadableServerName = null;
   /**
-   * The time in milliseconds between heartbeats from the replication
-   * server.  Zero means heartbeats are off.
+   * The expected duration in milliseconds between heartbeats received
+   * from the replication server.  Zero means heartbeats are off.
    */
   private long heartbeatInterval = 0;
   /**
@@ -142,6 +142,16 @@
   // Same group id poller thread
   private SameGroupIdPoller sameGroupIdPoller = null;
 
+  /**
+   * The thread that publishes messages to the RS containing the current
+   * change time of this DS.
+   */
+  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
+  /**
+   * The expected period in milliseconds between these messages are sent
+   * to the replication server.  Zero means heartbeats are off.
+   */
+  private long changeTimeHeartbeatSendInterval = 0;
   /*
    * Properties for the last topology info received from the network.
    */
@@ -159,24 +169,27 @@
    *
    * @param replicationDomain The replication domain that is creating us.
    * @param state The ServerState that should be used by this broker
-   *              when negotiating the session with the replicationServer.
+   *        when negotiating the session with the replicationServer.
    * @param baseDn The base DN that should be used by this broker
-   *              when negotiating the session with the replicationServer.
+   *        when negotiating the session with the replicationServer.
    * @param serverId The server ID that should be used by this broker
-   *              when negotiating the session with the replicationServer.
+   *        when negotiating the session with the replicationServer.
    * @param window The size of the send and receive window to use.
-   * @param heartbeatInterval The interval between heartbeats requested of the
-   * replicationServer, or zero if no heartbeats are requested.
-   *
    * @param generationId The generationId for the server associated to the
    * provided serverId and for the domain associated to the provided baseDN.
+   * @param heartbeatInterval The interval (in ms) between heartbeats requested
+   *        from the replicationServer, or zero if no heartbeats are requested.
    * @param replSessionSecurity The session security configuration.
    * @param groupId The group id of our domain.
+   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
+   *        time  heartbeats are sent to the RS,
+   *        or zero if no CN heartbeat shoud be sent.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
     ServerState state, String baseDn, short serverId, int window,
     long generationId, long heartbeatInterval,
-    ReplSessionSecurity replSessionSecurity, byte groupId)
+    ReplSessionSecurity replSessionSecurity, byte groupId,
+    long changeTimeHeartbeatInterval)
   {
     this.domain = replicationDomain;
     this.baseDn = baseDn;
@@ -190,6 +203,7 @@
     this.maxRcvWindow = window;
     this.maxRcvWindow = window;
     this.halfRcvWindow = window /2;
+    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
   }
 
   /**
@@ -392,7 +406,8 @@
 
     // Stop any existing poller and heartbeat monitor from a previous session.
     stopSameGroupIdPoller();
-    stopHeartBeat();
+    stopRSHeartBeatMonitoring();
+    stopChangeTimeHeartBeatPublishing();
 
     boolean newServerWithSameGroupId = false;
     synchronized (connectPhaseLock)
@@ -508,7 +523,8 @@
                  logError(message);
                  startSameGroupIdPoller();
                 }
-                startHeartBeat();
+                startRSHeartBeatMonitoring();
+                startChangeTimeHeartBeatPublishing();
               } else
               {
                 // Detected new RS with our group id: log disconnection to
@@ -1025,7 +1041,7 @@
       // Send our Start Session
       StartECLSessionMsg startECLSessionMsg = null;
       startECLSessionMsg = new StartECLSessionMsg();
-      startECLSessionMsg.setOperationId(Short.toString(serverId));
+      startECLSessionMsg.setOperationId("-1");
       session.publish(startECLSessionMsg);
 
       /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
@@ -1428,7 +1444,7 @@
   /**
    * Start the heartbeat monitor thread.
    */
-  private void startHeartBeat()
+  private void startRSHeartBeatMonitoring()
   {
     // Start a heartbeat monitor thread.
     if (heartbeatInterval > 0)
@@ -1467,7 +1483,7 @@
   /**
    * Stop the heartbeat monitor thread.
    */
-  void stopHeartBeat()
+  void stopRSHeartBeatMonitoring()
   {
     if (heartbeatMonitor != null)
     {
@@ -1753,7 +1769,8 @@
         + " domain " + baseDn);
     }
     stopSameGroupIdPoller();
-    stopHeartBeat();
+    stopRSHeartBeatMonitoring();
+    stopChangeTimeHeartBeatPublishing();
     replicationServer = "stopped";
     shutdown = true;
     connected = false;
@@ -2156,4 +2173,38 @@
   {
     return connectionError;
   }
+
+  /**
+   * Starts publishing to the RS the current timestamp used in this server.
+   */
+  public void startChangeTimeHeartBeatPublishing()
+  {
+    // Start a CN heartbeat thread.
+    if (changeTimeHeartbeatSendInterval > 0)
+    {
+      ctHeartbeatPublisherThread =
+        new CTHeartbeatPublisherThread(
+            "Replication CN Heartbeat Thread started for " +
+            baseDn + " with " + getReplicationServer(),
+            session, changeTimeHeartbeatSendInterval, serverId);
+      ctHeartbeatPublisherThread.start();
+    }
+    else
+    {
+      TRACER.debugInfo(this +
+          " is not configured to send CN heartbeat interval");
+    }
+  }
+
+  /**
+   * Stops publishing to the RS the current timestamp used in this server.
+   */
+  public void stopChangeTimeHeartBeatPublishing()
+  {
+    if (ctHeartbeatPublisherThread != null)
+    {
+      ctHeartbeatPublisherThread.shutdown();
+      ctHeartbeatPublisherThread = null;
+    }
+  }
 }

--
Gitblit v1.10.0