mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

coulbeck
02.50.2007 0f942fb22a49820dacbc16bd9769fbb479e0e4f2
These changes are for issue 787:
LDAP server need to detect failure of changelog servers

The synchronization server sends a regular heartbeat message when the session is idle and there are no synchronization updates flowing. The broker attempts to re-establish a connection to the same or alternative sync server when it detects loss of heartbeats.
3 files added
13 files modified
832 ■■■■■ changed files
opends/resource/schema/02-config.ldif 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java 44 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java 66 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java 139 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java 64 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java 85 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java 171 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java 40 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java 38 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 9 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java 95 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java 36 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -1038,6 +1038,10 @@
  NAME 'ds-cfg-group-implementation-enabled'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.305
  NAME 'ds-cfg-heartbeat-interval'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
  MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1296,7 +1300,7 @@
  $ ds-cfg-synchronization-dn )
  MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $
  ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $
  ds-cfg-window-size )
  ds-cfg-window-size $ ds-cfg-heartbeat-interval )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.59
  NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -389,10 +389,10 @@
  }
  /**
   * This method manage the connection with the other LDAP servers
   * it periodically that this changelog server is correctly connected
   * to all the other changelog servers and if not attempt to
   * do the connection.
   * This method manages the connection with the other changelog servers.
   * It periodically checks that this changelog server is indeed connected
   * to all the other changelog servers and if not attempts to
   * make the connection.
   */
  private void runConnect()
  {
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -61,6 +61,7 @@
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +109,17 @@
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * The thread that will send heartbeats.
   */
  HeartbeatThread heartbeatThread = null;
  private static Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -173,6 +185,7 @@
        maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
        maxSendDelay = receivedMsg.getMaxSendDelay();
        maxSendQueue = receivedMsg.getMaxSendQueue();
        heartbeatInterval = receivedMsg.getHeartbeatInterval();
        if (maxReceiveQueue > 0)
          restartReceiveQueue = (maxReceiveQueue > 1000 ?
@@ -199,6 +212,12 @@
                              maxSendDelay);
        else
          restartSendDelay = 0;
        if (heartbeatInterval < 0)
        {
          heartbeatInterval = 0;
        }
        serverIsLDAPserver = true;
        changelogCache = changelog.getChangelogCache(this.baseDn);
@@ -256,6 +275,16 @@
      reader.start();
      writer.start();
      // Create a thread to send heartbeat messages.
      if (heartbeatInterval > 0)
      {
        heartbeatThread = new HeartbeatThread("Synchronization Heartbeat",
                                              session, heartbeatInterval);
        heartbeatThread.start();
      }
      DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
      DirectoryServer.registerMonitorProvider(this);
    }
@@ -853,6 +882,12 @@
      msgQueue.notifyAll();
    }
    // Stop the heartbeat thread.
    if (heartbeatThread != null)
    {
      heartbeatThread.shutdown();
    }
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
  }
@@ -1225,4 +1260,13 @@
  {
    sendWindow.release(windowMsg.getNumAck());
  }
  /**
   * Get our heartbeat interval.
   * @return Our heartbeat interval.
   */
  public long getHeartbeatInterval()
  {
    return heartbeatInterval;
  }
}
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -94,6 +94,24 @@
  private int timeout = 0;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  /**
   * The number of times the connection was lost.
   */
  private int numLostConnections = 0;
  /**
   * Creates a new Changelog Broker for a particular SynchronizationDomain.
   *
   * @param state The ServerState that should be used by this broker
@@ -110,10 +128,12 @@
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * changelog server, or zero if no heartbeats are requested.
   */
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window)
      int maxSendDelay, int window, long heartbeatInterval)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -127,6 +147,7 @@
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
  }
  /**
@@ -157,7 +178,7 @@
  /**
   * Connect the Changelog server to other servers.
   * Connect to a Changelog server.
   *
   * @throws NumberFormatException address was invalid
   * @throws IOException error during connection phase
@@ -166,6 +187,13 @@
  {
    ChangelogStartMessage startMsg;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
    {
      heartbeatMonitor.shutdown();
      heartbeatMonitor = null;
    }
    boolean checkState = true;
    while( !connected)
    {
@@ -191,9 +219,9 @@
          /*
           * Send our ServerStartMessage.
           */
          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              halfRcvWindow*2, state);
              halfRcvWindow*2, heartbeatInterval, state);
          session.publish(msg);
@@ -369,6 +397,15 @@
        }
      }
    }
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
           new HeartbeatMonitor("Synchronization Heartbeat Monitor", session,
                                heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -379,6 +416,8 @@
   */
  private void reStart(ProtocolSession failingSession)
  {
    numLostConnections++;
    try
    {
      failingSession.close();
@@ -445,7 +484,7 @@
  /**
   * Receive a message.
   * @return the received message
   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   */
  public SynchronizationMessage receive() throws SocketTimeoutException
@@ -474,13 +513,11 @@
          }
          return msg;
        }
      } catch (SocketTimeoutException e)
      {
        throw e;
      } catch (Exception e)
      {
        if (e instanceof SocketTimeoutException)
        {
          SocketTimeoutException e1 = (SocketTimeoutException) e;
          throw e1;
        }
        if (shutdown == false)
        {
          synchronized (lock)
@@ -631,4 +668,13 @@
    else
      return 0;
  }
  /**
   * Get the number of times the connection was lost.
   * @return The number of times the connection was lost.
   */
  public int getNumLostConnections()
  {
    return numLostConnections;
  }
}
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
New file
@@ -0,0 +1,139 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.plugin;
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
import static org.opends.server.loggers.Debug.debugMessage;
import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION;
import static org.opends.server.types.DebugLogSeverity.INFO;
import java.io.IOException;
/**
 * This class implements a thread to monitor heartbeat messages from the
 * synchronization server.  Each broker runs one of these threads.
 */
public class HeartbeatMonitor extends DirectoryThread
{
  /**
   * The fully-qualified name of this class for debugging purposes.
   */
  private static final String CLASS_NAME =
       "org.opends.server.synchronization.plugin.HeartbeatMonitor";
  /**
   * The session on which heartbeats are to be monitored.
   */
  private ProtocolSession session;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval;
  /**
   * Set this to stop the thread.
   */
  private boolean shutdown = false;
  /**
   * Create a heartbeat monitor thread.
   * @param threadName The name of the heartbeat thread.
   * @param session The session on which heartbeats are to be monitored.
   * @param heartbeatInterval The expected interval between heartbeats in
   * milliseconds.
   */
  public HeartbeatMonitor(String threadName, ProtocolSession session,
                          long heartbeatInterval)
  {
    super(threadName);
    this.session = session;
    this.heartbeatInterval = heartbeatInterval;
  }
  /**
   * Call this method to stop the thread.
   */
  public void shutdown()
  {
    shutdown = true;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void run()
  {
    debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
                 "Heartbeat monitor is starting, expected interval is "
                      + heartbeatInterval);
    try
    {
      while (!shutdown)
      {
        long now = System.currentTimeMillis();
        long lastReceiveTime = session.getLastReceiveTime();
        if (now > lastReceiveTime + 2 * heartbeatInterval)
        {
          // Heartbeat is well overdue so the server is assumed to be dead.
          debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
                       "Heartbeat monitor is closing the broker " +
                            "session because it could not detect a " +
                            "heartbeat.");
          session.close();
          break;
        }
        try
        {
          Thread.sleep(heartbeatInterval);
        }
        catch (InterruptedException e)
        {
          // That's OK.
        }
      }
    }
    catch (IOException e)
    {
      // Hope that's OK.
    }
    finally
    {
      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
                   "Heartbeat monitor is exiting.");
    }
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,6 +27,12 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_ABBR;
import static org.opends.server.util.ServerConstants.
     TIME_UNIT_MILLISECONDS_FULL;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -40,6 +46,7 @@
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -52,6 +59,7 @@
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -123,6 +131,12 @@
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  private short serverId;
  private BooleanConfigAttribute receiveStatusStub;
@@ -151,6 +165,7 @@
  static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
  static String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
  static String WINDOW_SIZE = "ds-cfg-window-size";
  static String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval";
  private static final StringConfigAttribute changelogStub =
    new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
@@ -165,6 +180,25 @@
                          true, false, false);
  /**
   * The set of time units that will be used for expressing the heartbeat
   * interval.
   */
  private static final LinkedHashMap<String,Double> timeUnits =
       new LinkedHashMap<String,Double>();
  static
  {
    timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D);
    timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D);
    timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D);
    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
  }
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   *
   * @param configEntry The ConfigEntry to use to read the configuration of this
@@ -302,6 +336,24 @@
      configAttributes.add(windowAttr);
    }
    IntegerWithUnitConfigAttribute heartbeatStub =
      new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL,
                                         "heartbeat interval",
                                         false, timeUnits, true, 0, false, 0);
    IntegerWithUnitConfigAttribute heartbeatAttr =
      (IntegerWithUnitConfigAttribute)
           configEntry.getConfigAttribute(heartbeatStub);
    if (heartbeatAttr == null)
    {
      // Attribute is not present : use the default value
      heartbeatInterval = 1000;
    }
    else
    {
      heartbeatInterval = heartbeatAttr.activeCalculatedValue();
      configAttributes.add(heartbeatAttr);
    }
    configDn = configEntry.getDN();
    DirectoryServer.registerConfigurableComponent(this);
@@ -315,7 +367,8 @@
    try
    {
      broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
          maxReceiveDelay, maxSendQueue, maxSendDelay, window);
          maxReceiveDelay, maxSendQueue, maxSendDelay, window,
          heartbeatInterval);
      synchronized (broker)
      {
        broker.start(changelogServers);
@@ -1762,4 +1815,13 @@
  {
    return broker.getCurrentSendWindow();
  }
  /**
   * Get the number of times the synchronization connection was lost.
   * @return The number of times the synchronization connection was lost.
   */
  public int getNumLostConnections()
  {
    return broker.getNumLostConnections();
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
@@ -97,6 +97,10 @@
    attr = new Attribute("connected-to", domain.getChangelogServer());
    attributes.add(attr);
    /* get number of lost connections */
    addMonitorData(attributes, "lost-connections",
                   domain.getNumLostConnections());
    /* get number of received updates */
    addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates());
opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java
New file
@@ -0,0 +1,85 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import java.util.zip.DataFormatException;
/**
 * This message is sent at regular intervals by the synchronization server
 * when it is sending no other messages.  It allows the directory server to
 * detect a problem sooner when a synchronization server has crashed or has
 * been isolated from the network.
 */
public class HeartbeatMessage extends SynchronizationMessage
{
  /**
   * Create a new HeartbeatMessage.
   *
   */
  public HeartbeatMessage()
  {
  }
  /**
   * Creates a new heartbeat message from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public HeartbeatMessage(byte[] in) throws DataFormatException
  {
    /* The heartbeat message is encoded in the form :
     * <msg-type>
     */
    /* first byte is the type */
    if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT)
      throw new DataFormatException("Input is not a valid Heartbeat Message.");
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    /*
     * The heartbeat message contains:
     * <msg-type>
     */
    int length = 1;
    byte[] resultByteArray = new byte[length];
    /* put the message type */
    resultByteArray[0] = MSG_TYPE_HEARTBEAT;
    return resultByteArray;
  }
}
opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java
New file
@@ -0,0 +1,171 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.protocol;
import org.opends.server.api.DirectoryThread;
import static org.opends.server.loggers.Debug.debugMessage;
import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION;
import static org.opends.server.types.DebugLogSeverity.INFO;
import static org.opends.server.types.DebugLogSeverity.VERBOSE;
import java.io.IOException;
/**
 * This thread publishes a heartbeat message on a given protocol session at
 * regular intervals when there are no other synchronization messages being
 * published.
 */
public class HeartbeatThread extends DirectoryThread
{
  /**
   * The fully-qualified name of this class for debugging purposes.
   */
  private static final String CLASS_NAME =
       "org.opends.server.synchronization.plugin.HeartbeatThread";
  /**
   * For test purposes only to simulate loss of heartbeats.
   */
  static private boolean heartbeatsDisabled = false;
  /**
   * The session on which heartbeats are to be sent.
   */
  private ProtocolSession session;
  /**
   * The time in milliseconds between heartbeats.
   */
  private long heartbeatInterval;
  /**
   * Set this to stop the thread.
   */
  private boolean shutdown = false;
  /**
   * Create a heartbeat thread.
   * @param threadName The name of the heartbeat thread.
   * @param session The session on which heartbeats are to be sent.
   * @param heartbeatInterval The desired interval between heartbeats in
   * milliseconds.
   */
  public HeartbeatThread(String threadName, ProtocolSession session,
                  long heartbeatInterval)
  {
    super(threadName);
    this.session = session;
    this.heartbeatInterval = heartbeatInterval;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void run()
  {
    try
    {
      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
                   "Heartbeat thread is starting, interval is " +
                        heartbeatInterval);
      HeartbeatMessage heartbeatMessage = new HeartbeatMessage();
      while (!shutdown)
      {
        long now = System.currentTimeMillis();
        debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
                     "Heartbeat thread awoke at " + now +
                          ", last message was sent at " +
                          session.getLastPublishTime());
        if (now > session.getLastPublishTime() + heartbeatInterval)
        {
          if (!heartbeatsDisabled)
          {
            debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
                         "Heartbeat sent at " + now);
            session.publish(heartbeatMessage);
          }
        }
        try
        {
          long sleepTime = session.getLastPublishTime() +
               heartbeatInterval - now;
          if (sleepTime <= 0)
          {
            sleepTime = heartbeatInterval;
          }
          debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
                       "Heartbeat thread sleeping for " + sleepTime);
          Thread.sleep(sleepTime);
        }
        catch (InterruptedException e)
        {
          // Keep looping.
        }
      }
    }
    catch (IOException e)
    {
      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
                   "Heartbeat thread could not send a heartbeat.");
      // This will be caught in another thread.
    }
    finally
    {
      debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
                   "Heartbeat thread is exiting.");
    }
  }
  /**
   * Call this method to stop the thread.
   */
  public void shutdown()
  {
    shutdown = true;
  }
  /**
   * For testing purposes only to simulate loss of heartbeats.
   * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
   */
  public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
  {
    HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
  }
}
opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
@@ -43,8 +43,7 @@
  /**
   * This method is called when the session with the remote must be closed.
   * It must
   * This object  won't be used anymore after this method is called.
   * This object won't be used anymore after this method is called.
   *
   * @throws IOException If an error happen during the close process.
   */
@@ -103,4 +102,22 @@
  *         such as a TCP error.
  */
  public abstract void setSoTimeout(int timeout) throws SocketException;
  /**
   * Gets the time the last synchronization message was published on this
   * session.
   * @return The timestamp in milliseconds of the last message published.
   */
  public abstract long getLastPublishTime();
  /**
   * Gets the time the last synchronization message was received on this
   * session.
   * @return The timestamp in milliseconds of the last message received.
   */
  public abstract long getLastReceiveTime();
}
opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
@@ -57,6 +57,12 @@
  private ServerState serverState = null;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * Create a new ServerStartMessage.
   *
   * @param serverId The serverId of the server for which the ServerStartMessage
@@ -67,11 +73,13 @@
   * @param maxSendDelay The max Send Delay from this server.
   * @param maxSendQueue The max send Queue from this server.
   * @param windowSize   The window size used by this server.
   * @param heartbeatInterval The requested heartbeat interval.
   * @param serverState  The state of this server.
   */
  public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                            int maxReceiveQueue, int maxSendDelay,
                            int maxSendQueue, int windowSize,
                            long heartbeatInterval,
                            ServerState serverState)
  {
    this.serverId = serverId;
@@ -80,8 +88,10 @@
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    this.serverState = serverState;
    this.windowSize = windowSize;
    this.heartbeatInterval = heartbeatInterval;
    this.serverState = serverState;
    try
    {
@@ -105,7 +115,7 @@
  {
    /* The ServerStartMessage is encoded in the form :
     * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><window><ServerState>
     * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
     */
    try
    {
@@ -173,6 +183,13 @@
      pos += length +1;
      /*
       * read the heartbeatInterval
       */
      length = getNextLength(in, pos);
      heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
      * read the ServerState
      */
      serverState = new ServerState(in, pos, in.length-1);
@@ -269,7 +286,7 @@
    /*
     * ServerStartMessage contains.
     * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><windowsize><ServerState>
     * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState>
     */
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -285,6 +302,8 @@
                     String.valueOf(maxSendQueue).getBytes("UTF-8");
      byte[] byteWindowSize =
                     String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteHeartbeatInterval =
                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
@@ -294,6 +313,7 @@
                   byteMaxSendDelay.length + 1 +
                   byteMaxSendQueue.length + 1 +
                   byteWindowSize.length + 1 +
                   byteHeartbeatInterval.length + 1 +
                   byteServerState.length + 1;
      byte[] resultByteArray = new byte[length];
@@ -318,6 +338,8 @@
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
@@ -337,4 +359,16 @@
  {
    return windowSize;
  }
  /**
   * Get the heartbeat interval requested by the ldap server that created the
   * message.
   *
   * @return The heartbeat interval requested by the ldap server that created
   * the message.
   */
  public long getHeartbeatInterval()
  {
    return heartbeatInterval;
  }
}
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -50,6 +50,18 @@
  byte[] rcvLengthBuf = new byte[8];
  /**
   * The time the last message published to this session.
   */
  private long lastPublishTime = 0;
  /**
   * The time the last message was received on this session.
   */
  private long lastReceiveTime = 0;
  /**
   * Creates a new SocketSession based on the provided socket.
   *
   * @param socket The Socket on which the SocketSession will be based.
@@ -87,6 +99,8 @@
    output.write(sendLengthBuf);
    output.write(buffer);
    output.flush();
    lastPublishTime = System.currentTimeMillis();
  }
  /**
@@ -102,9 +116,13 @@
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      if (read == -1)
      {
        throw new IOException("no more data");
      }
      else
      {
        length += read;
      }
    }
    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
@@ -114,7 +132,11 @@
      length = 0;
      byte[] buffer = new byte[totalLength];
      while (length < totalLength)
      {
        length += input.read(buffer, length, totalLength - length);
      }
      lastReceiveTime = System.currentTimeMillis();
      return SynchronizationMessage.generateMsg(buffer);
    }
    catch (OutOfMemoryError e)
@@ -127,6 +149,22 @@
  /**
   * {@inheritDoc}
   */
  public long getLastPublishTime()
  {
    return lastPublishTime;
  }
  /**
   * {@inheritDoc}
   */
  public long getLastReceiveTime()
  {
    return lastReceiveTime;
  }
  /**
   * {@inheritDoc}
   */
  public String getRemoteAddress()
  {
    return socket.getInetAddress().getHostAddress();
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -46,6 +46,7 @@
  static final byte MSG_TYPE_SERVER_START = 6;
  static final byte MSG_TYPE_CHANGELOG_START = 7;
  static final byte MSG_TYPE_WINDOW = 8;
  static final byte MSG_TYPE_HEARTBEAT = 9;
  /**
   * Return the byte[] representation of this message.
@@ -57,6 +58,8 @@
   * MSG_TYPE_ACK
   * MSG_TYPE_SERVER_START
   * MSG_TYPE_CHANGELOG_START
   * MSG_TYPE_WINDOW
   * MSG_TYPE_HEARTBEAT
   *
   * @return the byte[] representation of this message.
   */
@@ -101,6 +104,9 @@
      case MSG_TYPE_WINDOW:
        msg = new WindowMessage(buffer);
      break;
      case MSG_TYPE_HEARTBEAT:
        msg = new HeartbeatMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -120,7 +120,7 @@
    if (emptyOldChanges)
      state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
@@ -155,7 +155,7 @@
          throws Exception, SocketException
  {
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
@@ -164,7 +164,7 @@
    return broker;
  }
  /**
   * Open a changelog session with flow control to the local Changelog server.
   *
@@ -179,7 +179,8 @@
    if (emptyOldChanges)
      state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
        state, baseDn, serverId, maxRcvQueue, 0,
        maxSendQueue, 0, window_size, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -46,6 +46,7 @@
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -311,6 +312,100 @@
  }
  /**
   * Tests whether the synchronization provider fails over when it loses
   * the heartbeat from the synchronization server.
   */
  @Test(groups = "slow")
  public void lostHeartbeatFailover() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : lostHeartbeatFailover" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the changelog server using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the changelog server.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(gen.NewChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID,
        baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    broker.publish(addMsg);
    entryList.add(personWithUUIDEntry.getDN());
    Entry resultEntry;
    // Check that the entry has been created in the directory server.
    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, true);
    assertNotNull(resultEntry,
        "The ADD synchronization message was not replayed");
    // Send a first modify operation message.
    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
    ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
        personWithUUIDEntry.getDN(), mods,
        user1entryUUID);
    broker.publish(modMsg);
    // Check that the modify has been replayed.
    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
                           "telephonenumber", "01 02 45", 10000, true);
    if (!found)
    {
      fail("The first modification was not replayed.");
    }
    // Simulate loss of heartbeats.
    HeartbeatThread.setHeartbeatsDisabled(true);
    Thread.sleep(3000);
    HeartbeatThread.setHeartbeatsDisabled(false);
    // Send a second modify operation message.
    mods = generatemods("description", "Description was changed");
    modMsg = new ModifyMsg(gen.NewChangeNumber(),
        personWithUUIDEntry.getDN(), mods,
        user1entryUUID);
    broker.publish(modMsg);
    // Check that the modify has been replayed.
    found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
                                   "description", "Description was changed",
                                   10000, true);
    if (!found)
    {
      fail("The second modification was not replayed.");
    }
    // Delete the entries to clean the database.
    DeleteMsg delMsg =
      new DeleteMsg(personWithUUIDEntry.getDN().toString(),
          gen.NewChangeNumber(), user1entryUUID);
    broker.publish(delMsg);
    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false);
    // Check that the delete operation has been applied.
    assertNull(resultEntry,
        "The DELETE synchronization message was not replayed");
    broker.stop();
  }
  /**
   * Tests the naming conflict resolution code.
   * In this test, the local server act both as an LDAP server and
   * a changelog server that are inter-connected.
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -36,7 +36,6 @@
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import org.opends.server.api.ClientConnection;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -48,20 +47,9 @@
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.PendingChange;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddMsg;
import org.opends.server.synchronization.protocol.ChangelogStartMessage;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.ServerStartMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
@@ -187,7 +175,7 @@
         throws Exception
  {
    DN dn = DN.decode(rawdn);
    ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");;
    ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");
    // Check uuid
    assertEquals("fakeuniqueid", msg.getUniqueId());
@@ -221,7 +209,6 @@
  /**
   * Build some data for the DeleteMsg test below.
   * @throws DirectoryException
   */
  @DataProvider(name = "deleteEncodeDecode")
  public Object[][] createDelData() {
@@ -383,7 +370,7 @@
    //Create an Add operation and generate and Add msg from it
    DN dn = DN.decode(rawDN);
    addOp = new AddOperation((ClientConnection) connection,
    addOp = new AddOperation(connection,
        (long) 1, 1, null, dn, objectClassList, userAttList, opList);
    OperationContext opCtx = new AddContext(cn, "thisIsaUniqueID",
        "parentUniqueId");
@@ -404,7 +391,6 @@
  /**
   * Build some data for the AckMsg test below.
   * @throws DirectoryException
   */
  @DataProvider(name = "ackMsg")
  public Object[][] createAckData() {
@@ -451,7 +437,7 @@
    // Check that retrieved CN is OK
    msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes());
  }
  @DataProvider(name="serverStart")
  public Object [][] createServerStartMessageTestData() throws Exception
  {
@@ -469,16 +455,20 @@
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
        window, window, window, window, window, state);
        window, window, window, window, window, window, state);
    ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
    assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
    assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay());
    assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
  }
  @DataProvider(name="changelogStart")
  public Object [][] createChangelogStartMessageTestData() throws Exception
  {
@@ -486,7 +476,7 @@
    ServerState state = new ServerState();
    return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} };
  }
  /**
   * Test that changelogStartMessage encoding and decoding works
   * by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
@@ -496,7 +486,7 @@
         String url, ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
    ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
        url, baseDN, window, state);
    ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
@@ -506,7 +496,7 @@
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
  }
  /**
   * Test that WindowMessageTest encoding and decoding works
   * by checking that : msg == new WindowMessageTest(msg.getBytes()).