mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.19.2013 55065c7531e93a725b02dc619f6c526228e768ce
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
@@ -72,10 +73,7 @@
  private static final DebugTracer TRACER = getTracer();
  private volatile boolean shutdown = false;
  private final Object startStopLock = new Object();
  /**
   * Replication server URLs under this format: "<code>hostname:port</code>".
   */
  private volatile Set<String> replicationServerUrls;
  private volatile ReplicationDomainCfg config;
  private volatile boolean connected = false;
  /**
   * String reported under CSN=monitor when there is no connected RS.
@@ -84,18 +82,13 @@
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile Session session;
  private final ServerState state;
  private final DN baseDN;
  private final int serverId;
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow = 100;
  private int halfRcvWindow = rcvWindow / 2;
  private int maxRcvWindow = rcvWindow;
  private int timeout = 0;
  private short protocolVersion;
  private ReplSessionSecurity replSessionSecurity;
  /** My group id. */
  private byte groupId = -1;
  /** The group id of the RS we are connected to. */
  private byte rsGroupId = -1;
  /** The server id of the RS we are connected to. */
@@ -117,11 +110,6 @@
  private Map<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor;
@@ -144,11 +132,6 @@
   * change time of this DS.
   */
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
  /**
   * The expected period in milliseconds between these messages are sent
   * to the replication server. Zero means heartbeats are off.
   */
  private long changeTimeHeartbeatSendInterval = 0;
  /*
   * Properties for the last topology info received from the network.
   */
@@ -199,40 +182,23 @@
   * @param replicationDomain The replication domain that is creating us.
   * @param state The ServerState that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param baseDN The base DN that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param serverId The server ID that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param window The size of the send and receive window to use.
   * @param config The configuration to use.
   * @param generationId The generationId for the server associated to the
   * provided serverId and for the domain associated to the provided baseDN.
   * @param heartbeatInterval The interval (in ms) between heartbeats requested
   *        from the replicationServer, or zero if no heartbeats are requested.
   * @param replSessionSecurity The session security configuration.
   * @param groupId The group id of our domain.
   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
   *        time  heartbeats are sent to the RS,
   *        or zero if no CSN heartbeat should be sent.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, DN baseDN, int serverId, int window,
    long generationId, long heartbeatInterval,
    ReplSessionSecurity replSessionSecurity, byte groupId,
    long changeTimeHeartbeatInterval)
      ServerState state, ReplicationDomainCfg config, long generationId,
      ReplSessionSecurity replSessionSecurity)
  {
    this.domain = replicationDomain;
    this.baseDN = baseDN;
    this.serverId = serverId;
    this.state = state;
    this.config = config;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.replSessionSecurity = replSessionSecurity;
    this.groupId = groupId;
    this.generationID = generationId;
    this.heartbeatInterval = heartbeatInterval;
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
    this.rcvWindow = getMaxRcvWindow();
    this.halfRcvWindow = rcvWindow / 2;
    /*
     * Only create a monitor if there is a replication domain (this is not the
@@ -251,31 +217,7 @@
    synchronized (startStopLock)
    {
      shutdown = false;
      this.rcvWindow = this.maxRcvWindow;
      connect();
    }
  }
  /**
   * Start the ReplicationBroker.
   *
   * @param replicationServers list of servers used
   */
  public void start(Set<String> replicationServers)
  {
    synchronized (startStopLock)
    {
      // Open Socket to the ReplicationServer Send the Start message
      shutdown = false;
      this.replicationServerUrls = replicationServers;
      if (this.replicationServerUrls.size() < 1)
      {
        Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
        logError(message);
      }
      this.rcvWindow = this.maxRcvWindow;
      this.rcvWindow = getMaxRcvWindow();
      connect();
    }
  }
@@ -304,7 +246,22 @@
   */
  public int getServerId()
  {
    return serverId;
    return config.getServerId();
  }
  private DN getBaseDN()
  {
    return config.getBaseDN();
  }
  private Set<String> getReplicationServerUrls()
  {
    return config.getReplicationServer();
  }
  private byte getGroupId()
  {
    return (byte) config.getGroupId();
  }
  /**
@@ -358,7 +315,7 @@
      replicationServerInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : replicationServerUrls)
    for (String serverUrl : getReplicationServerUrls())
    {
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
@@ -725,7 +682,7 @@
  private void connect()
  {
    if (this.baseDN.toNormalizedString().equalsIgnoreCase(
    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
    {
      connectAsECL();
@@ -745,7 +702,7 @@
    Map<Integer, ReplicationServerInfo> rsInfos =
      new ConcurrentHashMap<Integer, ReplicationServerInfo>();
    for (String serverUrl : replicationServerUrls)
    for (String serverUrl : getReplicationServerUrls())
    {
      // Connect to server and get info about it
      ReplicationServerInfo replicationServerInfo =
@@ -782,7 +739,7 @@
  private void connectAsECL()
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.replicationServerUrls.iterator().next();
    String bestServer = getReplicationServerUrls().iterator().next();
    if (performPhaseOneHandshake(bestServer, true, true) != null)
    {
@@ -841,6 +798,9 @@
    synchronized (connectPhaseLock)
    {
      final int serverId = getServerId();
      final DN baseDN = getBaseDN();
      /*
       * Connect to each replication server and get their ServerState then find
       * out which one is the best to connect to.
@@ -859,7 +819,7 @@
      {
        // At least one server answered, find the best one.
        electedRsInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, groupId, getGenerationID());
          replicationServerInfos, serverId, getGroupId(), getGenerationID());
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
@@ -940,8 +900,7 @@
          if (replicationServerInfos.size() > 0)
          {
            Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
                serverId,
                baseDN.toNormalizedString(),
                serverId, baseDN.toNormalizedString(),
                collectionToString(replicationServerInfos.keySet(), ", "));
            logError(message);
          }
@@ -969,6 +928,8 @@
  private void connectToReplicationServer(ReplicationServerInfo rsInfo,
      ServerStatus initStatus, TopologyMsg topologyMsg)
  {
    final int serverId = getServerId();
    final DN baseDN = getBaseDN();
    try
    {
      replicationServer = session.getReadableRemoteAddress();
@@ -1007,7 +968,7 @@
        }
      }
      sendWindow = new Semaphore(maxSendWindow);
      rcvWindow = maxRcvWindow;
      rcvWindow = getMaxRcvWindow();
      connected = true;
      /*
@@ -1020,6 +981,7 @@
            .getGenerationId(), session);
      }
      final byte groupId = getGroupId();
      if (getRsGroupId() != groupId)
      {
        /*
@@ -1094,8 +1056,8 @@
      int nChanges = ServerState.diffChanges(rsState, state);
      if (debugEnabled())
      {
        TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
            + serverId + " computed " + nChanges + " changes late.");
        TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
            + getServerId() + " computed " + nChanges + " changes late.");
      }
      /*
@@ -1157,15 +1119,15 @@
      StartMsg serverStartMsg;
      if (!isECL)
      {
        serverStartMsg = new ServerStartMsg(serverId, url,
            baseDN, maxRcvWindow, heartbeatInterval, state,
            getGenerationID(), isSslEncryption, groupId);
        serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      else
      {
        serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
            maxRcvWindow, heartbeatInterval, state,
            getGenerationID(), isSslEncryption, groupId);
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      localSession.publish(serverStartMsg);
@@ -1174,7 +1136,7 @@
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + serverStartMsg + "\nAND RECEIVED:\n" + msg);
      }
@@ -1184,10 +1146,10 @@
      // Sanity check
      DN repDN = replServerInfo.getBaseDN();
      if (!baseDN.equals(repDN))
      if (!getBaseDN().equals(repDN))
      {
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
            repDN.toNormalizedString(), baseDN.toNormalizedString());
            repDN.toNormalizedString(), getBaseDN().toNormalizedString());
        return null;
      }
@@ -1222,20 +1184,21 @@
    }
    catch (ConnectException e)
    {
      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
          server, baseDN.toNormalizedString());
      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
          server, getBaseDN().toNormalizedString());
      return null;
    }
    catch (SocketTimeoutException e)
    {
      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId,
          server, baseDN.toNormalizedString());
      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
          server, getBaseDN().toNormalizedString());
      return null;
    }
    catch (Exception e)
    {
      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
          server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      return null;
    }
    finally
@@ -1290,7 +1253,7 @@
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + startECLSessionMsg);
      }
@@ -1299,8 +1262,9 @@
      connected = true;
    } catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      logError(message);
      setSession(null);
@@ -1356,7 +1320,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
            + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
      }
@@ -1365,8 +1329,9 @@
      return topologyMsg;
    } catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      logError(message);
      setSession(null);
@@ -1405,7 +1370,6 @@
      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
      byte groupId, long generationId)
  {
    // Shortcut, if only one server, this is the best
    if (rsInfos.size() == 1)
    {
@@ -1503,13 +1467,12 @@
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.isLocallyConfigured())
      ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.isLocallyConfigured())
      {
        result.put(rsId, replicationServerInfo);
        result.put(entry.getKey(), rsInfo);
      }
    }
    return result;
@@ -1529,13 +1492,12 @@
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.getGroupId() == groupId)
      ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.getGroupId() == groupId)
      {
        result.put(rsId, replicationServerInfo);
        result.put(entry.getKey(), rsInfo);
      }
    }
    return result;
@@ -1561,13 +1523,13 @@
      new HashMap<Integer, ReplicationServerInfo>();
    boolean emptyState = true;
    for (Integer rsId : bestServers.keySet())
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.getGenerationId() == generationId)
      ReplicationServerInfo rsInfo = entry.getValue();
      if (rsInfo.getGenerationId() == generationId)
      {
        result.put(rsId, replicationServerInfo);
        if (!replicationServerInfo.serverState.isEmpty())
        result.put(entry.getKey(), rsInfo);
        if (!rsInfo.serverState.isEmpty())
          emptyState = false;
      }
    }
@@ -1576,12 +1538,12 @@
    {
      // If the RS with a generationId have all an empty state,
      // then the 'empty'(genId=-1) RSes are also candidate
      for (Integer rsId : bestServers.keySet())
      for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
      {
        ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
        if (replicationServerInfo.getGenerationId() == -1)
        ReplicationServerInfo rsInfo = entry.getValue();
        if (rsInfo.getGenerationId() == -1)
        {
          result.put(rsId, replicationServerInfo);
          result.put(entry.getKey(), rsInfo);
        }
      }
    }
@@ -1615,18 +1577,18 @@
    }
    /**
     * Find replication servers who are up to date (or more up to date than us,
     * Find replication servers that are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. If some servers more up to date, prefer this list but take
     * server id. If some servers are more up to date, prefer this list but take
     * only the latest CSN.
     */
    CSN latestRsCSN = null;
    for (Integer rsId : bestServers.keySet())
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      ServerState rsState = replicationServerInfo.getServerState();
      CSN rsCSN = rsState.getCSN(localServerId);
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
      if (rsCSN == null)
      {
        rsCSN = new CSN(0, 0, localServerId);
@@ -1639,7 +1601,7 @@
        {
          // This replication server has exactly the latest change from the
          // local server
          upToDateServers.put(rsId, replicationServerInfo);
          upToDateServers.put(rsId, rsInfo);
        } else
        {
          // This replication server is even more up to date than the local
@@ -1653,13 +1615,13 @@
          {
            if (rsCSN.equals(latestRsCSN))
            {
              moreUpToDateServers.put(rsId, replicationServerInfo);
              moreUpToDateServers.put(rsId, rsInfo);
            } else
            {
              // This RS is even more up to date, clear the list and store this
              // new RS
              moreUpToDateServers.clear();
              moreUpToDateServers.put(rsId, replicationServerInfo);
              moreUpToDateServers.put(rsId, rsInfo);
              latestRsCSN = rsCSN;
            }
          }
@@ -1694,30 +1656,32 @@
     * Initially look for all servers on the same host. If we find one in the
     * same VM, then narrow the search.
     */
    boolean filterServersInSameVM = false;
    Map<Integer, ReplicationServerInfo> result =
    boolean foundRSInSameVM = false;
    final Map<Integer, ReplicationServerInfo> result =
        new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      final HostPort hp =
          HostPort.valueOf(replicationServerInfo.getServerURL());
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      final HostPort hp = HostPort.valueOf(rsInfo.getServerURL());
      if (hp.isLocalAddress())
      {
        if (isLocalReplicationServerPort(hp.getPort()))
        {
          // An RS in the same VM will always have priority.
          if (!filterServersInSameVM)
          if (!foundRSInSameVM)
          {
            // An RS in the same VM will always have priority.
            // Narrow the search to only include servers in this VM.
            result.clear();
            filterServersInSameVM = true;
            foundRSInSameVM = true;
          }
          result.put(rsId, replicationServerInfo);
          result.put(rsId, rsInfo);
        }
        else if (!filterServersInSameVM)
        else if (!foundRSInSameVM)
        {
          result.put(rsId, replicationServerInfo);
          // OK, accept RSs on the same machine because we have not found an RS
          // in the same VM yet
          result.put(rsId, rsInfo);
        }
        else
        {
@@ -1775,19 +1739,19 @@
    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
    // Precision for the operations (number of digits after the dot)
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    for (Integer rsId : bestServers.keySet())
    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      final Integer rsId = entry.getKey();
      final ReplicationServerInfo rsInfo = entry.getValue();
      int rsWeight = replicationServerInfo.getWeight();
      //  load goal = rs weight / sum of weights
      BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide(
      BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide(
          BigDecimal.valueOf(sumOfWeights), mathContext);
      BigDecimal currentLoadBd = BigDecimal.ZERO;
      if (sumOfConnectedDSs != 0)
      {
        // current load = number of connected DSs / total number of DSs
        int connectedDSs = replicationServerInfo.getConnectedDSNumber();
        int connectedDSs = rsInfo.getConnectedDSNumber();
        currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
            BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
      }
@@ -1944,7 +1908,7 @@
                  mathContext);
          /*
          Now compare both values: we must no disconnect the DS if this
          Now compare both values: we must not disconnect the DS if this
          is for going in a situation where the load distance of the other
          RSs is the opposite of the future load distance of the local RS
          or we would evaluate that we should disconnect just after being
@@ -2011,10 +1975,11 @@
  private void startRSHeartBeatMonitoring()
  {
    // Start a heartbeat monitor thread.
    final long heartbeatInterval = config.getHeartbeatInterval();
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
          baseDN.toNormalizedString(), session, heartbeatInterval);
          getBaseDN().toNormalizedString(), session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -2081,7 +2046,7 @@
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
              baseDN.toNormalizedString(), e.getLocalizedMessage()));
              getBaseDN().toNormalizedString(), e.getLocalizedMessage()));
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
        }
@@ -2339,6 +2304,8 @@
        break;
      }
      final int serverId = getServerId();
      final DN baseDN = getBaseDN();
      final int previousRsServerID = rsServerId;
      try
      {
@@ -2428,7 +2395,8 @@
              // best server checking.
              final ReplicationServerInfo bestServerInfo =
                  computeBestReplicationServer(false, previousRsServerID, state,
                      replicationServerInfos, serverId, groupId, generationID);
                      replicationServerInfos, serverId, getGroupId(),
                      generationID);
              if (previousRsServerID != -1
                  && (bestServerInfo == null
                      || bestServerInfo.getServerId() != previousRsServerID))
@@ -2520,7 +2488,7 @@
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    publish(new MonitorRequestMsg(serverId, getRsServerId()));
    publish(new MonitorRequestMsg(getServerId(), getRsServerId()));
    // wait for Response up to 10 seconds.
    try
@@ -2571,9 +2539,9 @@
  public void stop()
  {
    if (debugEnabled())
      TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will"
        + " close the connection to replication server " + rsServerId + " for"
        + " domain " + baseDN);
      TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
          + " and will close the connection to replication server "
          + rsServerId + " for domain " + getBaseDN());
    synchronized (startStopLock)
    {
@@ -2630,7 +2598,7 @@
   */
  public int getMaxRcvWindow()
  {
    return maxRcvWindow;
    return config.getWindowSize();
  }
  /**
@@ -2679,16 +2647,11 @@
  /**
   * Change some configuration parameters.
   *
   * @param replicationServers  The new list of replication servers.
   * @param window              The max window size.
   * @param heartbeatInterval   The heartBeat interval.
   *
   * @param newConfig  The new config to use.
   * @return                    A boolean indicating if the changes
   *                            requires to restart the service.
   * @param groupId            The new group id to use
   */
  public boolean changeConfig(Set<String> replicationServers, int window,
      long heartbeatInterval, byte groupId)
  public boolean changeConfig(ReplicationDomainCfg newConfig)
  {
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
@@ -2696,18 +2659,14 @@
    // A new session is necessary only when information regarding
    // the connection is modified
    boolean needToRestartSession =
        this.replicationServerUrls == null
        || !replicationServers.equals(this.replicationServerUrls)
        || window != this.maxRcvWindow
        || heartbeatInterval != this.heartbeatInterval
        || groupId != this.groupId;
        !newConfig.getReplicationServer().equals(config.getReplicationServer())
        || newConfig.getWindowSize() != config.getWindowSize()
        || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval()
        || newConfig.getGroupId() != config.getGroupId();
    this.replicationServerUrls = replicationServers;
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
    this.heartbeatInterval = heartbeatInterval;
    this.groupId = groupId;
    this.config = newConfig;
    this.rcvWindow = newConfig.getWindowSize();
    this.halfRcvWindow = this.rcvWindow / 2;
    return needToRestartSession;
  }
@@ -2756,23 +2715,14 @@
    } catch (IOException ex)
    {
      Message message = ERR_EXCEPTION_SENDING_CS.get(
        baseDN.toNormalizedString(),
        Integer.toString(serverId),
        getBaseDN().toNormalizedString(),
        Integer.toString(getServerId()),
        ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
      logError(message);
    }
  }
  /**
   * Sets the group id of the broker.
   * @param groupId The new group id.
   */
  public void setGroupId(byte groupId)
  {
    this.groupId = groupId;
  }
  /**
   * Gets the info for DSs in the topology (except us).
   * @return The info for DSs in the topology (except us)
   */
@@ -2815,7 +2765,7 @@
      sent by the replication server in the topology message. We must count
      ourselves as a connected server.
      */
      connectedDSs.add(serverId);
      connectedDSs.add(getServerId());
    }
    for (DSInfo dsInfo : dsList)
@@ -2907,21 +2857,23 @@
  /**
   * Starts publishing to the RS the current timestamp used in this server.
   */
  public void startChangeTimeHeartBeatPublishing()
  private void startChangeTimeHeartBeatPublishing()
  {
    // Start a CSN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
    if (changeTimeHeartbeatInterval > 0)
    {
      final Session localSession = session;
      final String threadName = "Replica DS(" + getServerId()
          + ") change time heartbeat publisher for domain \""
          + baseDN + "\" to RS(" + getRsServerId()
          + getBaseDN() + "\" to RS(" + getRsServerId()
          + ") at " + localSession.getReadableRemoteAddress();
      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
          threadName, localSession, changeTimeHeartbeatSendInterval, serverId);
          threadName, localSession, changeTimeHeartbeatInterval, getServerId());
      ctHeartbeatPublisherThread.start();
    } else
    }
    else
    {
      if (debugEnabled())
        TRACER.debugInfo(this
@@ -2932,7 +2884,7 @@
  /**
   * Stops publishing to the RS the current timestamp used in this server.
   */
  public synchronized void stopChangeTimeHeartBeatPublishing()
  private synchronized void stopChangeTimeHeartBeatPublishing()
  {
    if (ctHeartbeatPublisherThread != null)
    {
@@ -2942,17 +2894,6 @@
  }
  /**
   * Set a new change time heartbeat interval to this broker.
   * @param changeTimeHeartbeatInterval The new interval (in ms).
   */
  public void setChangeTimeHeartbeatInterval(int changeTimeHeartbeatInterval)
  {
    stopChangeTimeHeartBeatPublishing();
    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
    startChangeTimeHeartBeatPublishing();
  }
  /**
   * Set the connectRequiresRecovery to the provided value.
   * This flag is used to indicate if a recovery of Update is necessary
   * after a reconnection to a RS.
@@ -3044,8 +2985,9 @@
  {
    final StringBuilder sb = new StringBuilder();
    sb.append(getClass().getSimpleName())
      .append(" \"").append(baseDN).append(" ").append(serverId).append("\",")
      .append(" groupId=").append(groupId)
      .append(" \"").append(getBaseDN()).append(" ")
      .append(getServerId()).append("\",")
      .append(" groupId=").append(getGroupId())
      .append(", genId=").append(generationID)
      .append(", connected=").append(connected).append(", ");
    if (rsServerId == -1)