mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.09.2014 d709a2e4eecc9773af376587c476e33f0ccefce5
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -163,14 +163,14 @@
  /**
   * String reported under CSN=monitor when there is no connected RS.
   */
  public static final String NO_CONNECTED_SERVER = "Not connected";
  static final String NO_CONNECTED_SERVER = "Not connected";
  private final ServerState state;
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow = 100;
  private int halfRcvWindow = rcvWindow / 2;
  private int timeout = 0;
  private ReplSessionSecurity replSessionSecurity;
  private final ReplSessionSecurity replSessionSecurity;
  /**
   * The RS this DS is currently connected to.
   * <p>
@@ -181,7 +181,7 @@
  private final AtomicReference<ConnectedRS> connectedRS =
      new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
  /** Our replication domain. */
  private ReplicationDomain domain;
  private final ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
@@ -384,17 +384,19 @@
   * updated with a info coming from received topology messages or monitoring
   * messages.
   */
  public static class ReplicationServerInfo
  static class ReplicationServerInfo
  {
    private RSInfo rsInfo;
    private short protocolVersion;
    private DN baseDN;
    private int windowSize;
    private ServerState serverState;
    private boolean sslEncryption;
    private final short protocolVersion;
    private final DN baseDN;
    private final int windowSize;
    // @NotNull
    private final ServerState serverState;
    private final boolean sslEncryption;
    private final int degradedStatusThreshold;
    /** Keeps the 0 value if created with a ReplServerStartMsg. */
    private int connectedDSNumber = 0;
    // @NotNull
    private Set<Integer> connectedDSs;
    /**
     * Is this RS locally configured? (the RS is recognized as a usable server).
@@ -410,7 +412,7 @@
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public static ReplicationServerInfo newInstance(
    private static ReplicationServerInfo newInstance(
      ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
    {
      final ReplicationServerInfo rsInfo = newInstance(msg);
@@ -426,7 +428,7 @@
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public static ReplicationServerInfo newInstance(ReplicationMsg msg)
    static ReplicationServerInfo newInstance(ReplicationMsg msg)
        throws IllegalArgumentException
    {
      if (msg instanceof ReplServerStartMsg)
@@ -459,7 +461,8 @@
          msg.getGenerationId(), msg.getGroupId(), 1);
      this.baseDN = msg.getBaseDN();
      this.windowSize = msg.getWindowSize();
      this.serverState = msg.getServerState();
      final ServerState ss = msg.getServerState();
      this.serverState = ss != null ? ss : new ServerState();
      this.sslEncryption = msg.getSSLEncryption();
      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
    }
@@ -478,13 +481,38 @@
      this.protocolVersion = msg.getVersion();
      this.baseDN = msg.getBaseDN();
      this.windowSize = msg.getWindowSize();
      this.serverState = msg.getServerState();
      final ServerState ss = msg.getServerState();
      this.serverState = ss != null ? ss : new ServerState();
      this.sslEncryption = msg.getSSLEncryption();
      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
      this.connectedDSNumber = msg.getConnectedDSNumber();
    }
    /**
     * Constructs a new replication server info with the passed RSInfo internal
     * values and the passed connected DSs.
     *
     * @param rsInfo
     *          The RSinfo to use for the update
     * @param connectedDSs
     *          The new connected DSs
     */
    ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
    {
      this.rsInfo =
          new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo
              .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
      this.protocolVersion = 0;
      this.baseDN = null;
      this.windowSize = 0;
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
      this.sslEncryption = false;
      this.degradedStatusThreshold = -1;
      this.serverState = new ServerState();
    }
    /**
     * Get the server state.
     * @return The server state
     */
@@ -596,26 +624,10 @@
    }
    /**
     * Constructs a new replication server info with the passed RSInfo
     * internal values and the passed connected DSs.
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
    {
      this.rsInfo = new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(),
          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
      this.degradedStatusThreshold = -1;
      this.serverState = new ServerState();
    }
    /**
     * Converts the object to a RSInfo object.
     * @return The RSInfo object matching this object.
     */
    public RSInfo toRSInfo()
    RSInfo toRSInfo()
    {
      return rsInfo;
    }
@@ -626,7 +638,7 @@
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public void update(RSInfo rsInfo, Set<Integer> connectedDSs)
    private void update(RSInfo rsInfo, Set<Integer> connectedDSs)
    {
      this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
@@ -644,16 +656,9 @@
     * Updates replication server info with the passed server state.
     * @param serverState The ServerState to use for the update
     */
    public void update(ServerState serverState)
    private void update(ServerState serverState)
    {
      if (this.serverState != null)
      {
        this.serverState.update(serverState);
      }
      else
      {
        this.serverState = serverState;
      }
      this.serverState.update(serverState);
    }
    /**
@@ -1050,7 +1055,7 @@
   * @param dsGenId The local generation id
   * @return The initial status
   */
  public ServerStatus computeInitialServerStatus(long rsGenId,
  private ServerStatus computeInitialServerStatus(long rsGenId,
    ServerState rsState, int degradedStatusThreshold, long dsGenId)
  {
    if (rsGenId == -1)
@@ -1554,7 +1559,7 @@
   * disconnect (so the best replication server is another one than the current
   * one). Null can only be returned when firstConnection is false.
   */
  public static RSEvaluations computeBestReplicationServer(
  static RSEvaluations computeBestReplicationServer(
      boolean firstConnection, int rsServerId, ServerState myState,
      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
      byte groupId, long generationId)
@@ -1929,7 +1934,7 @@
   *        when it is not connected to a replication server
   *        (currentRsServerId = -1)
   */
  public static void computeBestServerForWeight(RSEvaluations evals,
  static void computeBestServerForWeight(RSEvaluations evals,
      int currentRsServerId, int localServerId)
  {
    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
@@ -2233,7 +2238,7 @@
  /**
   * Stop the heartbeat monitor thread.
   */
  synchronized void stopRSHeartBeatMonitoring()
  private synchronized void stopRSHeartBeatMonitoring()
  {
    if (heartbeatMonitor != null)
    {
@@ -2257,7 +2262,7 @@
   * @param failingSession the socket which failed
   * @param infiniteTry the socket which failed
   */
  public void reStart(Session failingSession, boolean infiniteTry)
  private void reStart(Session failingSession, boolean infiniteTry)
  {
    if (failingSession != null)
    {
@@ -2332,7 +2337,7 @@
   * @param retryOnFailure Whether reconnect should automatically be done.
   * @return               Whether publish succeeded.
   */
  public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
  boolean publish(ReplicationMsg msg, boolean retryOnFailure)
  {
    return publish(msg, false, retryOnFailure);
  }
@@ -2536,7 +2541,7 @@
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   */
  public ReplicationMsg receive(boolean reconnectToTheBestRS,
  ReplicationMsg receive(boolean reconnectToTheBestRS,
      boolean reconnectOnFailure, boolean returnOnTopoChange)
    throws SocketTimeoutException
  {
@@ -2898,7 +2903,7 @@
   * @return                    A boolean indicating if the changes
   *                            requires to restart the service.
   */
  public boolean changeConfig(ReplicationDomainCfg newConfig)
  boolean changeConfig(ReplicationDomainCfg newConfig)
  {
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
@@ -3315,7 +3320,7 @@
   *
   * @return true if the server could not connect to any Replication Server.
   */
  public boolean hasConnectionError()
  boolean hasConnectionError()
  {
    return connectionError;
  }
@@ -3374,7 +3379,7 @@
   * Returns whether the broker is shutting down.
   * @return whether the broker is shutting down.
   */
  public boolean shuttingDown()
  boolean shuttingDown()
  {
    return shutdown;
  }