mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

coulbeck
02.50.2007 11859d9a6e466bab4ab73e1e46d092c6052acf68
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -94,6 +94,24 @@
  private int timeout = 0;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  /**
   * The number of times the connection was lost.
   */
  private int numLostConnections = 0;
  /**
   * Creates a new Changelog Broker for a particular SynchronizationDomain.
   *
   * @param state The ServerState that should be used by this broker
@@ -110,10 +128,12 @@
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * changelog server, or zero if no heartbeats are requested.
   */
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window)
      int maxSendDelay, int window, long heartbeatInterval)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -127,6 +147,7 @@
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
  }
  /**
@@ -157,7 +178,7 @@
  /**
   * Connect the Changelog server to other servers.
   * Connect to a Changelog server.
   *
   * @throws NumberFormatException address was invalid
   * @throws IOException error during connection phase
@@ -166,6 +187,13 @@
  {
    ChangelogStartMessage startMsg;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
    {
      heartbeatMonitor.shutdown();
      heartbeatMonitor = null;
    }
    boolean checkState = true;
    while( !connected)
    {
@@ -191,9 +219,9 @@
          /*
           * Send our ServerStartMessage.
           */
          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              halfRcvWindow*2, state);
              halfRcvWindow*2, heartbeatInterval, state);
          session.publish(msg);
@@ -369,6 +397,15 @@
        }
      }
    }
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
           new HeartbeatMonitor("Synchronization Heartbeat Monitor", session,
                                heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -379,6 +416,8 @@
   */
  private void reStart(ProtocolSession failingSession)
  {
    numLostConnections++;
    try
    {
      failingSession.close();
@@ -445,7 +484,7 @@
  /**
   * Receive a message.
   * @return the received message
   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   */
  public SynchronizationMessage receive() throws SocketTimeoutException
@@ -474,13 +513,11 @@
          }
          return msg;
        }
      } catch (SocketTimeoutException e)
      {
        throw e;
      } catch (Exception e)
      {
        if (e instanceof SocketTimeoutException)
        {
          SocketTimeoutException e1 = (SocketTimeoutException) e;
          throw e1;
        }
        if (shutdown == false)
        {
          synchronized (lock)
@@ -631,4 +668,13 @@
    else
      return 0;
  }
  /**
   * Get the number of times the connection was lost.
   * @return The number of times the connection was lost.
   */
  public int getNumLostConnections()
  {
    return numLostConnections;
  }
}