mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
14.37.2009 5ec0cb08889c9f1a24fd4cc8b139dcdb942dd92a
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -116,8 +116,8 @@
  // performPhaseOneHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
@@ -142,6 +142,16 @@
  // Same group id poller thread
  private SameGroupIdPoller sameGroupIdPoller = null;
  /**
   * The thread that publishes messages to the RS containing the current
   * change time of this DS.
   */
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
  /**
   * The expected period in milliseconds between these messages are sent
   * to the replication server.  Zero means heartbeats are off.
   */
  private long changeTimeHeartbeatSendInterval = 0;
  /*
   * Properties for the last topology info received from the network.
   */
@@ -159,24 +169,27 @@
   *
   * @param replicationDomain The replication domain that is creating us.
   * @param state The ServerState that should be used by this broker
   *              when negotiating the session with the replicationServer.
   *        when negotiating the session with the replicationServer.
   * @param baseDn The base DN that should be used by this broker
   *              when negotiating the session with the replicationServer.
   *        when negotiating the session with the replicationServer.
   * @param serverId The server ID that should be used by this broker
   *              when negotiating the session with the replicationServer.
   *        when negotiating the session with the replicationServer.
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * replicationServer, or zero if no heartbeats are requested.
   *
   * @param generationId The generationId for the server associated to the
   * provided serverId and for the domain associated to the provided baseDN.
   * @param heartbeatInterval The interval (in ms) between heartbeats requested
   *        from the replicationServer, or zero if no heartbeats are requested.
   * @param replSessionSecurity The session security configuration.
   * @param groupId The group id of our domain.
   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
   *        time  heartbeats are sent to the RS,
   *        or zero if no CN heartbeat shoud be sent.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, String baseDn, short serverId, int window,
    long generationId, long heartbeatInterval,
    ReplSessionSecurity replSessionSecurity, byte groupId)
    ReplSessionSecurity replSessionSecurity, byte groupId,
    long changeTimeHeartbeatInterval)
  {
    this.domain = replicationDomain;
    this.baseDn = baseDn;
@@ -190,6 +203,7 @@
    this.maxRcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window /2;
    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
  }
  /**
@@ -392,7 +406,8 @@
    // Stop any existing poller and heartbeat monitor from a previous session.
    stopSameGroupIdPoller();
    stopHeartBeat();
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    boolean newServerWithSameGroupId = false;
    synchronized (connectPhaseLock)
@@ -508,7 +523,8 @@
                 logError(message);
                 startSameGroupIdPoller();
                }
                startHeartBeat();
                startRSHeartBeatMonitoring();
                startChangeTimeHeartBeatPublishing();
              } else
              {
                // Detected new RS with our group id: log disconnection to
@@ -1025,7 +1041,7 @@
      // Send our Start Session
      StartECLSessionMsg startECLSessionMsg = null;
      startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId(Short.toString(serverId));
      startECLSessionMsg.setOperationId("-1");
      session.publish(startECLSessionMsg);
      /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
@@ -1428,7 +1444,7 @@
  /**
   * Start the heartbeat monitor thread.
   */
  private void startHeartBeat()
  private void startRSHeartBeatMonitoring()
  {
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
@@ -1467,7 +1483,7 @@
  /**
   * Stop the heartbeat monitor thread.
   */
  void stopHeartBeat()
  void stopRSHeartBeatMonitoring()
  {
    if (heartbeatMonitor != null)
    {
@@ -1753,7 +1769,8 @@
        + " domain " + baseDn);
    }
    stopSameGroupIdPoller();
    stopHeartBeat();
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
@@ -2156,4 +2173,38 @@
  {
    return connectionError;
  }
  /**
   * Starts publishing to the RS the current timestamp used in this server.
   */
  public void startChangeTimeHeartBeatPublishing()
  {
    // Start a CN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    {
      ctHeartbeatPublisherThread =
        new CTHeartbeatPublisherThread(
            "Replication CN Heartbeat Thread started for " +
            baseDn + " with " + getReplicationServer(),
            session, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
    }
    else
    {
      TRACER.debugInfo(this +
          " is not configured to send CN heartbeat interval");
    }
  }
  /**
   * Stops publishing to the RS the current timestamp used in this server.
   */
  public void stopChangeTimeHeartBeatPublishing()
  {
    if (ctHeartbeatPublisherThread != null)
    {
      ctHeartbeatPublisherThread.shutdown();
      ctHeartbeatPublisherThread = null;
    }
  }
}