mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
15.30.2013 141b38df935b4bf865e1bcf2874aed1c70e3437c
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -82,7 +82,7 @@
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile Session session = null;
  private volatile Session session;
  private final ServerState state;
  private final DN baseDN;
  private final int serverId;
@@ -1284,7 +1284,8 @@
      // Send our Start Session
      StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId("-1");
      session.publish(startECLSessionMsg);
      final Session localSession = session;
      localSession.publish(startECLSessionMsg);
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
@@ -1294,7 +1295,7 @@
      }
      // Alright set the timeout to the desired value
      session.setSoTimeout(timeout);
      localSession.setSoTimeout(timeout);
      connected = true;
    } catch (Exception e)
    {
@@ -1319,8 +1320,6 @@
  private TopologyMsg performPhaseTwoHandshake(String server,
    ServerStatus initStatus)
  {
    TopologyMsg topologyMsg;
    try
    {
      /*
@@ -1347,12 +1346,13 @@
        startSessionMsg =
          new StartSessionMsg(initStatus, new ArrayList<String>());
      }
      session.publish(startSessionMsg);
      final Session localSession = session;
      localSession.publish(startSessionMsg);
      /*
       * Read the TopologyMsg that should come back.
       */
      topologyMsg = (TopologyMsg) session.receive();
      TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
      if (debugEnabled())
      {
@@ -1361,8 +1361,8 @@
      }
      // Alright set the timeout to the desired value
      session.setSoTimeout(timeout);
      localSession.setSoTimeout(timeout);
      return topologyMsg;
    } catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
@@ -1372,9 +1372,8 @@
      setSession(null);
      // Be sure to return null.
      topologyMsg = null;
      return null;
    }
    return topologyMsg;
  }
  /**
@@ -1423,6 +1422,7 @@
     *   local DS
     * - replication server in the same VM as local DS one
     */
    // TODO JNR log why an RS was evicted as best server
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    /*
    The list of best replication servers is filtered with each criteria. At
@@ -2225,10 +2225,10 @@
            Check the session. If it has changed, some disconnection or
            reconnection happened and we need to restart from scratch.
            */
            if (session != null && session == currentSession)
            final Session localSession = session;
            if (localSession != null && session == currentSession)
            {
              session.publish(msg);
              localSession.publish(msg);
              done = true;
            }
          }
@@ -2243,8 +2243,10 @@
            window update message was lost somehow...
            then loop to check again if connection was closed.
            */
            if (session != null) {
              session.publish(new WindowProbeMsg());
            Session localSession = session;
            if (localSession != null)
            {
              localSession.publish(new WindowProbeMsg());
            }
          }
        }
@@ -2330,8 +2332,8 @@
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final Session savedSession = session;
      if (savedSession == null)
      final Session localSession = session;
      if (localSession == null)
      {
        // Must be shutting down.
        break;
@@ -2340,7 +2342,7 @@
      final int previousRsServerID = rsServerId;
      try
      {
        ReplicationMsg msg = savedSession.receive();
        ReplicationMsg msg = localSession.receive();
        if (msg instanceof UpdateMsg)
        {
          synchronized (this)
@@ -2372,12 +2374,12 @@
        {
          // RS performs a proper disconnection
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
              previousRsServerID, savedSession.getReadableRemoteAddress(),
              previousRsServerID, localSession.getReadableRemoteAddress(),
              serverId, baseDN.toNormalizedString());
          logError(message);
          // Try to find a suitable RS
          reStart(savedSession, true);
          reStart(localSession, true);
        }
        else if (msg instanceof MonitorMsg)
        {
@@ -2436,14 +2438,15 @@
                {
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      localSession.getReadableRemoteAddress(),
                      baseDN.toNormalizedString());
                }
                else
                {
                  // TODO JNR log why an RS was evicted as best server
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      localSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
                }
@@ -2480,13 +2483,13 @@
            // We did not initiate the close on our side, log an error message.
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                serverId, baseDN.toNormalizedString(), previousRsServerID,
                savedSession.getReadableRemoteAddress());
                localSession.getReadableRemoteAddress());
            logError(message);
          }
          if (reconnectOnFailure)
          {
            reStart(savedSession, true);
            reStart(localSession, true);
          }
          else
          {
@@ -2546,9 +2549,10 @@
    try
    {
      updateDoneCount++;
      if ((updateDoneCount >= halfRcvWindow) && (session != null))
      final Session localSession = session;
      if (updateDoneCount >= halfRcvWindow && localSession != null)
      {
        session.publish(new WindowMsg(updateDoneCount));
        localSession.publish(new WindowMsg(updateDoneCount));
        rcvWindow += updateDoneCount;
        updateDoneCount = 0;
      }
@@ -2598,9 +2602,10 @@
  public void setSoTimeout(int timeout) throws SocketException
  {
    this.timeout = timeout;
    if (session != null)
    final Session localSession = session;
    if (localSession != null)
    {
      session.setSoTimeout(timeout);
      localSession.setSoTimeout(timeout);
    }
  }
@@ -2905,14 +2910,14 @@
    // Start a CSN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    {
      String threadName = "Replica DS(" + getServerId()
      final Session localSession = session;
      final String threadName = "Replica DS(" + getServerId()
          + ") change time heartbeat publisher for domain \""
          + this.baseDN + "\" to RS(" + getRsServerId()
          + ") at " + session.getReadableRemoteAddress();
          + baseDN + "\" to RS(" + getRsServerId()
          + ") at " + localSession.getReadableRemoteAddress();
      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
          threadName, session, changeTimeHeartbeatSendInterval,
          serverId);
          threadName, localSession, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
    } else
    {
@@ -3030,4 +3035,11 @@
      DirectoryServer.deregisterMonitorProvider(monitor);
    }
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " " + baseDN + " " + serverId;
  }
}