From 11859d9a6e466bab4ab73e1e46d092c6052acf68 Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Fri, 02 Feb 2007 21:50:10 +0000
Subject: [PATCH] These changes are for issue 787: LDAP server need to detect failure of changelog servers

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java                             |   40 ++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                                     |    8 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java         |    9 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java                           |    4 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java                                |   21 +
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java                                  |   38 ++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java                                  |   66 ++++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java                                 |   44 +++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java                            |   64 ++++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java                               |   85 ++++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java             |   95 ++++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java |   36 -
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java                                |  171 ++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java                                 |  139 +++++++++
 opendj-sdk/opends/resource/schema/02-config.ldif                                                                            |    6 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java                         |    6 
 16 files changed, 784 insertions(+), 48 deletions(-)

diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index e2f2eef..b7605d5 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1038,6 +1038,10 @@
   NAME 'ds-cfg-group-implementation-enabled'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.305
+  NAME 'ds-cfg-heartbeat-interval'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
   MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1296,7 +1300,7 @@
   $ ds-cfg-synchronization-dn )
   MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $
   ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $
-  ds-cfg-window-size )
+  ds-cfg-window-size $ ds-cfg-heartbeat-interval )
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.59
   NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index bac26ee..1b61fec 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -389,10 +389,10 @@
   }
 
   /**
-   * This method manage the connection with the other LDAP servers
-   * it periodically that this changelog server is correctly connected
-   * to all the other changelog servers and if not attempt to
-   * do the connection.
+   * This method manages the connection with the other changelog servers.
+   * It periodically checks that this changelog server is indeed connected
+   * to all the other changelog servers and if not attempts to
+   * make the connection.
    */
   private void runConnect()
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 6c74065..27a6be2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -61,6 +61,7 @@
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
 import org.opends.server.synchronization.protocol.UpdateMessage;
 import org.opends.server.synchronization.protocol.WindowMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
 import org.opends.server.util.TimeThread;
 
 /**
@@ -108,6 +109,17 @@
                                        // be stopped from sending messsages.
   private int saturationCount = 0;
 
+  /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval = 0;
+
+  /**
+   * The thread that will send heartbeats.
+   */
+  HeartbeatThread heartbeatThread = null;
+
   private static Map<ChangeNumber, ChangelogAckMessageList>
    changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
 
@@ -173,6 +185,7 @@
         maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
         maxSendDelay = receivedMsg.getMaxSendDelay();
         maxSendQueue = receivedMsg.getMaxSendQueue();
+        heartbeatInterval = receivedMsg.getHeartbeatInterval();
 
         if (maxReceiveQueue > 0)
           restartReceiveQueue = (maxReceiveQueue > 1000 ?
@@ -199,6 +212,12 @@
                               maxSendDelay);
         else
           restartSendDelay = 0;
+
+        if (heartbeatInterval < 0)
+        {
+          heartbeatInterval = 0;
+        }
+
         serverIsLDAPserver = true;
 
         changelogCache = changelog.getChangelogCache(this.baseDn);
@@ -256,6 +275,16 @@
 
       reader.start();
       writer.start();
+
+      // Create a thread to send heartbeat messages.
+      if (heartbeatInterval > 0)
+      {
+        heartbeatThread = new HeartbeatThread("Synchronization Heartbeat",
+                                              session, heartbeatInterval);
+        heartbeatThread.start();
+      }
+
+
       DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
       DirectoryServer.registerMonitorProvider(this);
     }
@@ -853,6 +882,12 @@
       msgQueue.notifyAll();
     }
 
+    // Stop the heartbeat thread.
+    if (heartbeatThread != null)
+    {
+      heartbeatThread.shutdown();
+    }
+
     DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
   }
 
@@ -1225,4 +1260,13 @@
   {
     sendWindow.release(windowMsg.getNumAck());
   }
+
+  /**
+   * Get our heartbeat interval.
+   * @return Our heartbeat interval.
+   */
+  public long getHeartbeatInterval()
+  {
+    return heartbeatInterval;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index 5a7d784..5239d75 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -94,6 +94,24 @@
   private int timeout = 0;
 
   /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval = 0;
+
+
+  /**
+   * A thread to monitor heartbeats on the session.
+   */
+  private HeartbeatMonitor heartbeatMonitor = null;
+
+  /**
+   * The number of times the connection was lost.
+   */
+  private int numLostConnections = 0;
+
+
+  /**
    * Creates a new Changelog Broker for a particular SynchronizationDomain.
    *
    * @param state The ServerState that should be used by this broker
@@ -110,10 +128,12 @@
    *                     the changelog server.
    * @param maxSendDelay The maximum send delay to use on the changelog server.
    * @param window The size of the send and receive window to use.
+   * @param heartbeatInterval The interval between heartbeats requested of the
+   * changelog server, or zero if no heartbeats are requested.
    */
   public ChangelogBroker(ServerState state, DN baseDn, short serverID,
       int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-      int maxSendDelay, int window)
+      int maxSendDelay, int window, long heartbeatInterval)
   {
     this.baseDn = baseDn;
     this.serverID = serverID;
@@ -127,6 +147,7 @@
     this.rcvWindow = window;
     this.maxRcvWindow = window;
     this.halfRcvWindow = window/2;
+    this.heartbeatInterval = heartbeatInterval;
   }
 
   /**
@@ -157,7 +178,7 @@
 
 
   /**
-   * Connect the Changelog server to other servers.
+   * Connect to a Changelog server.
    *
    * @throws NumberFormatException address was invalid
    * @throws IOException error during connection phase
@@ -166,6 +187,13 @@
   {
     ChangelogStartMessage startMsg;
 
+    // Stop any existing heartbeat monitor from a previous session.
+    if (heartbeatMonitor != null)
+    {
+      heartbeatMonitor.shutdown();
+      heartbeatMonitor = null;
+    }
+
     boolean checkState = true;
     while( !connected)
     {
@@ -191,9 +219,9 @@
           /*
            * Send our ServerStartMessage.
            */
-          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
+          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
               maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              halfRcvWindow*2, state);
+              halfRcvWindow*2, heartbeatInterval, state);
           session.publish(msg);
 
 
@@ -369,6 +397,15 @@
         }
       }
     }
+
+    // Start a heartbeat monitor thread.
+    if (heartbeatInterval > 0)
+    {
+      heartbeatMonitor =
+           new HeartbeatMonitor("Synchronization Heartbeat Monitor", session,
+                                heartbeatInterval);
+      heartbeatMonitor.start();
+    }
   }
 
 
@@ -379,6 +416,8 @@
    */
   private void reStart(ProtocolSession failingSession)
   {
+    numLostConnections++;
+
     try
     {
       failingSession.close();
@@ -445,7 +484,7 @@
   /**
    * Receive a message.
    * @return the received message
-   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+   * @throws SocketTimeoutException if the timeout set by setSoTimeout
    *         has expired
    */
   public SynchronizationMessage receive() throws SocketTimeoutException
@@ -474,13 +513,11 @@
           }
           return msg;
         }
+      } catch (SocketTimeoutException e)
+      {
+        throw e;
       } catch (Exception e)
       {
-        if (e instanceof SocketTimeoutException)
-        {
-          SocketTimeoutException e1 = (SocketTimeoutException) e;
-          throw e1;
-        }
         if (shutdown == false)
         {
           synchronized (lock)
@@ -631,4 +668,13 @@
     else
       return 0;
   }
+
+  /**
+   * Get the number of times the connection was lost.
+   * @return The number of times the connection was lost.
+   */
+  public int getNumLostConnections()
+  {
+    return numLostConnections;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
new file mode 100644
index 0000000..2065217
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -0,0 +1,139 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization.plugin;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import static org.opends.server.loggers.Debug.debugMessage;
+import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION;
+import static org.opends.server.types.DebugLogSeverity.INFO;
+
+import java.io.IOException;
+
+/**
+ * This class implements a thread to monitor heartbeat messages from the
+ * synchronization server.  Each broker runs one of these threads.
+ */
+public class HeartbeatMonitor extends DirectoryThread
+{
+  /**
+   * The fully-qualified name of this class for debugging purposes.
+   */
+  private static final String CLASS_NAME =
+       "org.opends.server.synchronization.plugin.HeartbeatMonitor";
+
+
+  /**
+   * The session on which heartbeats are to be monitored.
+   */
+  private ProtocolSession session;
+
+
+  /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval;
+
+
+  /**
+   * Set this to stop the thread.
+   */
+  private boolean shutdown = false;
+
+
+  /**
+   * Create a heartbeat monitor thread.
+   * @param threadName The name of the heartbeat thread.
+   * @param session The session on which heartbeats are to be monitored.
+   * @param heartbeatInterval The expected interval between heartbeats in
+   * milliseconds.
+   */
+  public HeartbeatMonitor(String threadName, ProtocolSession session,
+                          long heartbeatInterval)
+  {
+    super(threadName);
+    this.session = session;
+    this.heartbeatInterval = heartbeatInterval;
+  }
+
+  /**
+   * Call this method to stop the thread.
+   */
+  public void shutdown()
+  {
+    shutdown = true;
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void run()
+  {
+    debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+                 "Heartbeat monitor is starting, expected interval is "
+                      + heartbeatInterval);
+    try
+    {
+      while (!shutdown)
+      {
+        long now = System.currentTimeMillis();
+        long lastReceiveTime = session.getLastReceiveTime();
+        if (now > lastReceiveTime + 2 * heartbeatInterval)
+        {
+          // Heartbeat is well overdue so the server is assumed to be dead.
+          debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+                       "Heartbeat monitor is closing the broker " +
+                            "session because it could not detect a " +
+                            "heartbeat.");
+          session.close();
+          break;
+        }
+        try
+        {
+          Thread.sleep(heartbeatInterval);
+        }
+        catch (InterruptedException e)
+        {
+          // That's OK.
+        }
+      }
+    }
+    catch (IOException e)
+    {
+      // Hope that's OK.
+    }
+    finally
+    {
+      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+                   "Heartbeat monitor is exiting.");
+    }
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 6bb188d..1369bed 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,6 +27,12 @@
 package org.opends.server.synchronization.plugin;
 
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.util.ServerConstants.
+     TIME_UNIT_MILLISECONDS_ABBR;
+import static org.opends.server.util.ServerConstants.
+     TIME_UNIT_MILLISECONDS_FULL;
+import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
+import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
 import static org.opends.server.synchronization.common.LogMessages.*;
 import static org.opends.server.synchronization.plugin.Historical.*;
 import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -40,6 +46,7 @@
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.LinkedHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
@@ -52,6 +59,7 @@
 import org.opends.server.config.DNConfigAttribute;
 import org.opends.server.config.IntegerConfigAttribute;
 import org.opends.server.config.StringConfigAttribute;
+import org.opends.server.config.IntegerWithUnitConfigAttribute;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
@@ -123,6 +131,12 @@
   private int maxReceiveDelay = 0;
   private int maxSendDelay = 0;
 
+  /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval = 0;
+
   private short serverId;
 
   private BooleanConfigAttribute receiveStatusStub;
@@ -151,6 +165,7 @@
   static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
   static String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
   static String WINDOW_SIZE = "ds-cfg-window-size";
+  static String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval";
 
   private static final StringConfigAttribute changelogStub =
     new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
@@ -165,6 +180,25 @@
                           true, false, false);
 
   /**
+   * The set of time units that will be used for expressing the heartbeat
+   * interval.
+   */
+  private static final LinkedHashMap<String,Double> timeUnits =
+       new LinkedHashMap<String,Double>();
+
+
+
+  static
+  {
+    timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D);
+    timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D);
+    timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D);
+    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
+  }
+
+
+
+  /**
    * Creates a new SynchronizationDomain using configuration from configEntry.
    *
    * @param configEntry The ConfigEntry to use to read the configuration of this
@@ -302,6 +336,24 @@
       configAttributes.add(windowAttr);
     }
 
+    IntegerWithUnitConfigAttribute heartbeatStub =
+      new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL,
+                                         "heartbeat interval",
+                                         false, timeUnits, true, 0, false, 0);
+    IntegerWithUnitConfigAttribute heartbeatAttr =
+      (IntegerWithUnitConfigAttribute)
+           configEntry.getConfigAttribute(heartbeatStub);
+    if (heartbeatAttr == null)
+    {
+      // Attribute is not present : use the default value
+      heartbeatInterval = 1000;
+    }
+    else
+    {
+      heartbeatInterval = heartbeatAttr.activeCalculatedValue();
+      configAttributes.add(heartbeatAttr);
+    }
+
     configDn = configEntry.getDN();
     DirectoryServer.registerConfigurableComponent(this);
 
@@ -315,7 +367,8 @@
     try
     {
       broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
-          maxReceiveDelay, maxSendQueue, maxSendDelay, window);
+          maxReceiveDelay, maxSendQueue, maxSendDelay, window,
+          heartbeatInterval);
       synchronized (broker)
       {
         broker.start(changelogServers);
@@ -1762,4 +1815,13 @@
   {
     return broker.getCurrentSendWindow();
   }
+
+  /**
+   * Get the number of times the synchronization connection was lost.
+   * @return The number of times the synchronization connection was lost.
+   */
+  public int getNumLostConnections()
+  {
+    return broker.getNumLostConnections();
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
index 1b8ae4b..5a1dc30 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
@@ -97,6 +97,10 @@
     attr = new Attribute("connected-to", domain.getChangelogServer());
     attributes.add(attr);
 
+    /* get number of lost connections */
+    addMonitorData(attributes, "lost-connections",
+                   domain.getNumLostConnections());
+
     /* get number of received updates */
     addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates());
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java
new file mode 100644
index 0000000..a6d3cc4
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java
@@ -0,0 +1,85 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization.protocol;
+
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is sent at regular intervals by the synchronization server
+ * when it is sending no other messages.  It allows the directory server to
+ * detect a problem sooner when a synchronization server has crashed or has
+ * been isolated from the network.
+ */
+public class HeartbeatMessage extends SynchronizationMessage
+{
+  /**
+   * Create a new HeartbeatMessage.
+   *
+   */
+  public HeartbeatMessage()
+  {
+  }
+
+  /**
+   * Creates a new heartbeat message from its encoded form.
+   *
+   * @param in The byte array containing the encoded form of the message.
+   * @throws java.util.zip.DataFormatException If the byte array does not
+   * contain a valid encoded form of the message.
+   */
+  public HeartbeatMessage(byte[] in) throws DataFormatException
+  {
+    /* The heartbeat message is encoded in the form :
+     * <msg-type>
+     */
+
+    /* first byte is the type */
+    if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT)
+      throw new DataFormatException("Input is not a valid Heartbeat Message.");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    /*
+     * The heartbeat message contains:
+     * <msg-type>
+     */
+    int length = 1;
+    byte[] resultByteArray = new byte[length];
+
+    /* put the message type */
+    resultByteArray[0] = MSG_TYPE_HEARTBEAT;
+
+    return resultByteArray;
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java
new file mode 100644
index 0000000..1121dad
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java
@@ -0,0 +1,171 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization.protocol;
+
+import org.opends.server.api.DirectoryThread;
+import static org.opends.server.loggers.Debug.debugMessage;
+import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION;
+import static org.opends.server.types.DebugLogSeverity.INFO;
+import static org.opends.server.types.DebugLogSeverity.VERBOSE;
+
+import java.io.IOException;
+
+/**
+ * This thread publishes a heartbeat message on a given protocol session at
+ * regular intervals when there are no other synchronization messages being
+ * published.
+ */
+public class HeartbeatThread extends DirectoryThread
+{
+  /**
+   * The fully-qualified name of this class for debugging purposes.
+   */
+  private static final String CLASS_NAME =
+       "org.opends.server.synchronization.plugin.HeartbeatThread";
+
+
+  /**
+   * For test purposes only to simulate loss of heartbeats.
+   */
+  static private boolean heartbeatsDisabled = false;
+
+  /**
+   * The session on which heartbeats are to be sent.
+   */
+  private ProtocolSession session;
+
+
+  /**
+   * The time in milliseconds between heartbeats.
+   */
+  private long heartbeatInterval;
+
+
+  /**
+   * Set this to stop the thread.
+   */
+  private boolean shutdown = false;
+
+
+  /**
+   * Create a heartbeat thread.
+   * @param threadName The name of the heartbeat thread.
+   * @param session The session on which heartbeats are to be sent.
+   * @param heartbeatInterval The desired interval between heartbeats in
+   * milliseconds.
+   */
+  public HeartbeatThread(String threadName, ProtocolSession session,
+                  long heartbeatInterval)
+  {
+    super(threadName);
+    this.session = session;
+    this.heartbeatInterval = heartbeatInterval;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void run()
+  {
+    try
+    {
+      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+                   "Heartbeat thread is starting, interval is " +
+                        heartbeatInterval);
+      HeartbeatMessage heartbeatMessage = new HeartbeatMessage();
+
+      while (!shutdown)
+      {
+        long now = System.currentTimeMillis();
+        debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
+                     "Heartbeat thread awoke at " + now +
+                          ", last message was sent at " +
+                          session.getLastPublishTime());
+
+        if (now > session.getLastPublishTime() + heartbeatInterval)
+        {
+          if (!heartbeatsDisabled)
+          {
+            debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
+                         "Heartbeat sent at " + now);
+            session.publish(heartbeatMessage);
+          }
+        }
+
+        try
+        {
+          long sleepTime = session.getLastPublishTime() +
+               heartbeatInterval - now;
+          if (sleepTime <= 0)
+          {
+            sleepTime = heartbeatInterval;
+          }
+
+          debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
+                       "Heartbeat thread sleeping for " + sleepTime);
+          Thread.sleep(sleepTime);
+        }
+        catch (InterruptedException e)
+        {
+          // Keep looping.
+        }
+      }
+    }
+    catch (IOException e)
+    {
+      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+                   "Heartbeat thread could not send a heartbeat.");
+      // This will be caught in another thread.
+    }
+    finally
+    {
+      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+                   "Heartbeat thread is exiting.");
+    }
+  }
+
+
+  /**
+   * Call this method to stop the thread.
+   */
+  public void shutdown()
+  {
+    shutdown = true;
+  }
+
+
+  /**
+   * For testing purposes only to simulate loss of heartbeats.
+   * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
+   */
+  public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
+  {
+    HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
index 7da612d..947243c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
@@ -43,8 +43,7 @@
 
   /**
    * This method is called when the session with the remote must be closed.
-   * It must
-   * This object  won't be used anymore after this method is called.
+   * This object won't be used anymore after this method is called.
    *
    * @throws IOException If an error happen during the close process.
    */
@@ -103,4 +102,22 @@
   *         such as a TCP error.
   */
   public abstract void setSoTimeout(int timeout) throws SocketException;
+
+
+
+  /**
+   * Gets the time the last synchronization message was published on this
+   * session.
+   * @return The timestamp in milliseconds of the last message published.
+   */
+  public abstract long getLastPublishTime();
+
+
+
+  /**
+   * Gets the time the last synchronization message was received on this
+   * session.
+   * @return The timestamp in milliseconds of the last message received.
+   */
+  public abstract long getLastReceiveTime();
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
index 48b6c74..97cf84c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
@@ -57,6 +57,12 @@
   private ServerState serverState = null;
 
   /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval = 0;
+
+  /**
    * Create a new ServerStartMessage.
    *
    * @param serverId The serverId of the server for which the ServerStartMessage
@@ -67,11 +73,13 @@
    * @param maxSendDelay The max Send Delay from this server.
    * @param maxSendQueue The max send Queue from this server.
    * @param windowSize   The window size used by this server.
+   * @param heartbeatInterval The requested heartbeat interval.
    * @param serverState  The state of this server.
    */
   public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                             int maxReceiveQueue, int maxSendDelay,
                             int maxSendQueue, int windowSize,
+                            long heartbeatInterval,
                             ServerState serverState)
   {
     this.serverId = serverId;
@@ -80,8 +88,10 @@
     this.maxReceiveQueue = maxReceiveQueue;
     this.maxSendDelay = maxSendDelay;
     this.maxSendQueue = maxSendQueue;
-    this.serverState = serverState;
     this.windowSize = windowSize;
+    this.heartbeatInterval = heartbeatInterval;
+
+    this.serverState = serverState;
 
     try
     {
@@ -105,7 +115,7 @@
   {
     /* The ServerStartMessage is encoded in the form :
      * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
-     * <maxSendDelay><maxSendQueue><window><ServerState>
+     * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
      */
     try
     {
@@ -173,6 +183,13 @@
       pos += length +1;
 
       /*
+       * read the heartbeatInterval
+       */
+      length = getNextLength(in, pos);
+      heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      /*
       * read the ServerState
       */
       serverState = new ServerState(in, pos, in.length-1);
@@ -269,7 +286,7 @@
     /*
      * ServerStartMessage contains.
      * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
-     * <maxSendDelay><maxSendQueue><windowsize><ServerState>
+     * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState>
      */
     try {
       byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -285,6 +302,8 @@
                      String.valueOf(maxSendQueue).getBytes("UTF-8");
       byte[] byteWindowSize =
                      String.valueOf(windowSize).getBytes("UTF-8");
+      byte[] byteHeartbeatInterval =
+                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
       byte[] byteServerState = serverState.getBytes();
 
       int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
@@ -294,6 +313,7 @@
                    byteMaxSendDelay.length + 1 +
                    byteMaxSendQueue.length + 1 +
                    byteWindowSize.length + 1 +
+                   byteHeartbeatInterval.length + 1 +
                    byteServerState.length + 1;
 
       byte[] resultByteArray = new byte[length];
@@ -318,6 +338,8 @@
 
       pos = addByteArray(byteWindowSize, resultByteArray, pos);
 
+      pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
+
       pos = addByteArray(byteServerState, resultByteArray, pos);
 
       return resultByteArray;
@@ -337,4 +359,16 @@
   {
     return windowSize;
   }
+
+  /**
+   * Get the heartbeat interval requested by the ldap server that created the
+   * message.
+   *
+   * @return The heartbeat interval requested by the ldap server that created
+   * the message.
+   */
+  public long getHeartbeatInterval()
+  {
+    return heartbeatInterval;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
index 08f8285..3b1185f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -50,6 +50,18 @@
   byte[] rcvLengthBuf = new byte[8];
 
   /**
+   * The time the last message published to this session.
+   */
+  private long lastPublishTime = 0;
+
+
+  /**
+   * The time the last message was received on this session.
+   */
+  private long lastReceiveTime = 0;
+
+
+  /**
    * Creates a new SocketSession based on the provided socket.
    *
    * @param socket The Socket on which the SocketSession will be based.
@@ -87,6 +99,8 @@
     output.write(sendLengthBuf);
     output.write(buffer);
     output.flush();
+
+    lastPublishTime = System.currentTimeMillis();
   }
 
   /**
@@ -102,9 +116,13 @@
     {
       int read = input.read(rcvLengthBuf, length, 8-length);
       if (read == -1)
+      {
         throw new IOException("no more data");
+      }
       else
+      {
         length += read;
+      }
     }
 
     int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
@@ -114,7 +132,11 @@
       length = 0;
       byte[] buffer = new byte[totalLength];
       while (length < totalLength)
+      {
         length += input.read(buffer, length, totalLength - length);
+      }
+
+      lastReceiveTime = System.currentTimeMillis();
       return SynchronizationMessage.generateMsg(buffer);
     }
     catch (OutOfMemoryError e)
@@ -127,6 +149,22 @@
   /**
    * {@inheritDoc}
    */
+  public long getLastPublishTime()
+  {
+    return lastPublishTime;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public long getLastReceiveTime()
+  {
+    return lastReceiveTime;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   public String getRemoteAddress()
   {
     return socket.getInetAddress().getHostAddress();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
index 77dcc1b..8aab0f5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -46,6 +46,7 @@
   static final byte MSG_TYPE_SERVER_START = 6;
   static final byte MSG_TYPE_CHANGELOG_START = 7;
   static final byte MSG_TYPE_WINDOW = 8;
+  static final byte MSG_TYPE_HEARTBEAT = 9;
 
   /**
    * Return the byte[] representation of this message.
@@ -57,6 +58,8 @@
    * MSG_TYPE_ACK
    * MSG_TYPE_SERVER_START
    * MSG_TYPE_CHANGELOG_START
+   * MSG_TYPE_WINDOW
+   * MSG_TYPE_HEARTBEAT
    *
    * @return the byte[] representation of this message.
    */
@@ -101,6 +104,9 @@
       case MSG_TYPE_WINDOW:
         msg = new WindowMessage(buffer);
       break;
+      case MSG_TYPE_HEARTBEAT:
+        msg = new HeartbeatMessage(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index 1c59158..69c8323 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -120,7 +120,7 @@
     if (emptyOldChanges)
       state.loadState();
     ChangelogBroker broker = new ChangelogBroker(
-        state, baseDn, serverId, 0, 0, 0, 0, window_size);
+        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
     broker.start(servers);
@@ -155,7 +155,7 @@
           throws Exception, SocketException
   {
     ChangelogBroker broker = new ChangelogBroker(
-        state, baseDn, serverId, 0, 0, 0, 0, window_size);
+        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
     broker.start(servers);
@@ -164,7 +164,7 @@
 
     return broker;
   }
-  
+
   /**
    * Open a changelog session with flow control to the local Changelog server.
    *
@@ -179,7 +179,8 @@
     if (emptyOldChanges)
       state.loadState();
     ChangelogBroker broker = new ChangelogBroker(
-        state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
+        state, baseDn, serverId, maxRcvQueue, 0,
+        maxSendQueue, 0, window_size, 0);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
     broker.start(servers);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 45652ce..6da10d0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -46,6 +46,7 @@
 import org.opends.server.synchronization.protocol.ModifyDNMsg;
 import org.opends.server.synchronization.protocol.ModifyMsg;
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
@@ -311,6 +312,100 @@
   }
 
   /**
+   * Tests whether the synchronization provider fails over when it loses
+   * the heartbeat from the synchronization server.
+   */
+  @Test(groups = "slow")
+  public void lostHeartbeatFailover() throws Exception
+  {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization test : lostHeartbeatFailover" , 1);
+
+    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+    /*
+     * Open a session to the changelog server using the broker API.
+     * This must use a different serverId to that of the directory server.
+     */
+    ChangelogBroker broker =
+      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
+
+
+    /*
+     * Create a Change number generator to generate new changenumbers
+     * when we need to send operation messages to the changelog server.
+     */
+    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
+
+
+    // Create and publish an update message to add an entry.
+    AddMsg addMsg = new AddMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN().toString(),
+        user1entryUUID,
+        baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    entryList.add(personWithUUIDEntry.getDN());
+    Entry resultEntry;
+
+    // Check that the entry has been created in the directory server.
+    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, true);
+    assertNotNull(resultEntry,
+        "The ADD synchronization message was not replayed");
+
+    // Send a first modify operation message.
+    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+    ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN(), mods,
+        user1entryUUID);
+    broker.publish(modMsg);
+
+    // Check that the modify has been replayed.
+    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+                           "telephonenumber", "01 02 45", 10000, true);
+    if (!found)
+    {
+      fail("The first modification was not replayed.");
+    }
+
+    // Simulate loss of heartbeats.
+    HeartbeatThread.setHeartbeatsDisabled(true);
+    Thread.sleep(3000);
+    HeartbeatThread.setHeartbeatsDisabled(false);
+
+    // Send a second modify operation message.
+    mods = generatemods("description", "Description was changed");
+    modMsg = new ModifyMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN(), mods,
+        user1entryUUID);
+    broker.publish(modMsg);
+
+    // Check that the modify has been replayed.
+    found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+                                   "description", "Description was changed",
+                                   10000, true);
+    if (!found)
+    {
+      fail("The second modification was not replayed.");
+    }
+
+    // Delete the entries to clean the database.
+    DeleteMsg delMsg =
+      new DeleteMsg(personWithUUIDEntry.getDN().toString(),
+          gen.NewChangeNumber(), user1entryUUID);
+    broker.publish(delMsg);
+    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false);
+
+    // Check that the delete operation has been applied.
+    assertNull(resultEntry,
+        "The DELETE synchronization message was not replayed");
+    broker.stop();
+  }
+
+  /**
    * Tests the naming conflict resolution code.
    * In this test, the local server act both as an LDAP server and
    * a changelog server that are inter-connected.
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
index 01a0c91..68ea221 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -36,7 +36,6 @@
 import org.testng.annotations.Test;
 import static org.testng.Assert.*;
 
-import org.opends.server.api.ClientConnection;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
@@ -48,20 +47,9 @@
 import org.opends.server.synchronization.common.ChangeNumber;
 import org.opends.server.synchronization.common.ServerState;
 import org.opends.server.synchronization.plugin.PendingChange;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.AddMsg;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.DeleteMsg;
-import org.opends.server.synchronization.protocol.ModifyDNMsg;
-import org.opends.server.synchronization.protocol.ModifyMsg;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
-import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DN;
 import org.opends.server.types.Modification;
 import org.opends.server.types.ModificationType;
@@ -187,7 +175,7 @@
          throws Exception
   {
     DN dn = DN.decode(rawdn);
-    ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");;
+    ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");
 
     // Check uuid
     assertEquals("fakeuniqueid", msg.getUniqueId());
@@ -221,7 +209,6 @@
 
   /**
    * Build some data for the DeleteMsg test below.
-   * @throws DirectoryException
    */
   @DataProvider(name = "deleteEncodeDecode")
   public Object[][] createDelData() {
@@ -383,7 +370,7 @@
     //Create an Add operation and generate and Add msg from it
     DN dn = DN.decode(rawDN);
 
-    addOp = new AddOperation((ClientConnection) connection,
+    addOp = new AddOperation(connection,
         (long) 1, 1, null, dn, objectClassList, userAttList, opList);
     OperationContext opCtx = new AddContext(cn, "thisIsaUniqueID",
         "parentUniqueId");
@@ -404,7 +391,6 @@
 
   /**
    * Build some data for the AckMsg test below.
-   * @throws DirectoryException
    */
   @DataProvider(name = "ackMsg")
   public Object[][] createAckData() {
@@ -451,7 +437,7 @@
     // Check that retrieved CN is OK
     msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes());
   }
-  
+
   @DataProvider(name="serverStart")
   public Object [][] createServerStartMessageTestData() throws Exception
   {
@@ -469,16 +455,20 @@
   {
     state.update(new ChangeNumber((long)1, 1,(short)1));
     ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
-        window, window, window, window, window, state);
+        window, window, window, window, window, window, state);
     ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
     assertEquals(msg.getServerId(), newMsg.getServerId());
     assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+    assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
+    assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
+    assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay());
+    assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue());
     assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
-    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+    assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
     assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
         newMsg.getServerState().getMaxChangeNumber((short)1));
   }
-  
+
   @DataProvider(name="changelogStart")
   public Object [][] createChangelogStartMessageTestData() throws Exception
   {
@@ -486,7 +476,7 @@
     ServerState state = new ServerState();
     return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} };
   }
-  
+
   /**
    * Test that changelogStartMessage encoding and decoding works
    * by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
@@ -496,7 +486,7 @@
          String url, ServerState state) throws Exception
   {
     state.update(new ChangeNumber((long)1, 1,(short)1));
-    ChangelogStartMessage msg = new ChangelogStartMessage(serverId, 
+    ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
         url, baseDN, window, state);
     ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
     assertEquals(msg.getServerId(), newMsg.getServerId());
@@ -506,7 +496,7 @@
     assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
         newMsg.getServerState().getMaxChangeNumber((short)1));
   }
-  
+
   /**
    * Test that WindowMessageTest encoding and decoding works
    * by checking that : msg == new WindowMessageTest(msg.getBytes()).

--
Gitblit v1.10.0