mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
13.23.2011 96eaa516a85e620a6b76a64ffbe71cdc6037e026
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.collectionToString;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -67,7 +68,6 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -871,7 +871,6 @@
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      String rsis = replicationServerInfos.toString();
      ReplicationServerInfo replicationServerInfo = null;
@@ -1030,22 +1029,18 @@
          this.getGenerationID()) ||
          (replicationServerInfo.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
            baseDn.toString(),
            Integer.toString(rsServerId),
            replicationServer,
            Integer.toString(serverId),
            Long.toString(this.getGenerationID()));
          Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDn,
                  session.getReadableRemoteAddress(),
                  getGenerationID());
          logError(message);
        } else
        {
          Message message =
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
            baseDn.toString(),
            replicationServer,
            Long.toString(this.getGenerationID()),
            Long.toString(replicationServerInfo.getGenerationId()));
          Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDn,
                  session.getReadableRemoteAddress(),
                  getGenerationID(),
                  replicationServerInfo.getGenerationId());
          logError(message);
        }
      } else
@@ -1058,9 +1053,22 @@
        {
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis);
          logError(message);
          if (replicationServerInfos.size() > 0)
          {
            Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
                serverId,
                baseDn,
                collectionToString(replicationServerInfos.keySet(),
                    ", "));
            logError(message);
          }
          else
          {
            Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
                serverId, baseDn);
            logError(message);
          }
        }
      }
    }
@@ -1149,30 +1157,33 @@
    }
  }
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * Connect to the provided server performing the first phase handshake (start
   * messages exchange) and return the reply message from the replication
   * server, wrapped in a ReplicationServerInfo object.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The answer from the server . Null if could not
   *         get an answer.
   * @param server
   *          Server to connect to.
   * @param keepConnection
   *          Do we keep session opened or not after handshake. Use true if want
   *          to perform handshake phase 2 with the same session and keep the
   *          session to create as the current one.
   * @return The answer from the server . Null if could not get an answer.
   */
  private ReplicationServerInfo performPhaseOneHandshake(String server,
    boolean keepConnection)
  private ReplicationServerInfo performPhaseOneHandshake(
      String server, boolean keepConnection)
  {
    ReplicationServerInfo replServerInfo = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    ProtocolSession localSession = null;
    boolean error = false;
    ProtocolSession localSession = null;
    Socket socket = null;
    boolean hasConnected = false;
    Message errorMessage = null;
    try
    {
      /*
@@ -1180,132 +1191,131 @@
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      Socket socket = new Socket();
          InetAddress.getByName(hostname), intPort);
      socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      localSession = replSessionSecurity.createClientSession(server, socket,
        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      /*
       * Send our ServerStartMsg.
       */
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
        maxRcvWindow, heartbeatInterval, state,
        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
        isSslEncryption,
        groupId);
      localSession = replSessionSecurity.createClientSession(
          socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT);
      boolean isSslEncryption = replSessionSecurity
          .isSslEncryption(server);
      // Send our ServerStartMsg.
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
          baseDn, maxRcvWindow, heartbeatInterval, state,
          ProtocolVersion.getCurrentVersion(),
          this.getGenerationID(), isSslEncryption, groupId);
      localSession.publish(serverStartMsg);
      /*
       * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
       * back.
       */
      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
      // come back.
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
          "\nAND RECEIVED:\n" + msg.toString());
        debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
            + serverStartMsg.toString() + "\nAND RECEIVED:\n"
            + msg.toString());
      }
      // Wrap received message in a server info object
      replServerInfo = ReplicationServerInfo.newInstance(msg);
      ReplicationServerInfo replServerInfo = ReplicationServerInfo
          .newInstance(msg);
      // Sanity check
      String repDn = replServerInfo.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
          this.baseDn);
        logError(message);
        error = true;
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
            this.baseDn);
        return null;
      }
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       * We have sent our own protocol version to the replication server. The
       * replication server will use the same one (or an older one if it is an
       * old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        replServerInfo.getProtocolVersion());
      protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo
          .getProtocolVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
      {
        localSession.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        if (keepConnection) // Log error message only for final connection
        {
          // the error message is only logged once to avoid overflowing
          // the error log
          logError(message);
        } else if (debugEnabled())
        {
          debugInfo(message.toString());
        }
      }
      error = true;
    } catch (Exception e)
    {
      if ((e instanceof SocketTimeoutException) && debugEnabled())
      {
        debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn);
      }
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      if (keepConnection) // Log error message only for final connection
      {
        logError(message);
      } else if (debugEnabled())
      {
        debugInfo(message.toString());
      }
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (localSession != null)
      hasConnected = true;
      // If this connection as the one to use for sending and receiving
      // updates, store it.
      if (keepConnection)
      {
        if (debugEnabled()) {
          debugInfo("In RB, closing session after phase 1");
        session = localSession;
      }
      return replServerInfo;
    }
    catch (ConnectException e)
    {
      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
          server, baseDn);
      return null;
    }
    catch (SocketTimeoutException e)
    {
      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId,
          server, baseDn);
      return null;
    }
    catch (Exception e)
    {
      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
      return null;
    }
    finally
    {
      if (!hasConnected || !keepConnection)
      {
        if (localSession != null)
        {
          localSession.close();
        }
        localSession.close();
        localSession = null;
        if (socket != null)
        {
          try
          {
            socket.close();
          }
          catch (IOException e)
          {
            // Ignore.
          }
        }
      }
      if (error)
      if (!hasConnected && errorMessage != null)
      {
        replServerInfo = null;
      } // Be sure to return null.
        // There was no server waiting on this host:port Log a notice and try
        // the next replicationServer in the list
        if (!connectionError)
        {
          if (keepConnection) // Log error message only for final connection
          {
            // the error message is only logged once to avoid overflowing
            // the error log
            logError(errorMessage);
          }
          if (debugEnabled())
          {
            debugInfo(errorMessage.toString());
          }
        }
      }
    }
    // If this connection as the one to use for sending and receiving updates,
    // store it.
    if (keepConnection)
    {
      session = localSession;
    }
    return replServerInfo;
  }
  /**
@@ -1323,6 +1333,8 @@
  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    // FIXME: this should be merged with performPhaseOneHandshake to avoid
    // code/bug duplication.
    ReplServerStartDSMsg replServerStartDSMsg = null;
    // Parse server string.
@@ -1344,7 +1356,7 @@
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      localSession = replSessionSecurity.createClientSession(server, socket,
      localSession = replSessionSecurity.createClientSession(socket,
        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
@@ -1400,7 +1412,9 @@
       */
      if (!connectionError)
      {
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
            server, baseDn);
        if (keepConnection) // Log error message only for final connection
        {
          // the error message is only logged once to avoid overflowing
@@ -1419,9 +1433,8 @@
        debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn);
      }
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
      if (keepConnection) // Log error message only for final connection
      {
        logError(message);
@@ -1499,9 +1512,8 @@
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
      logError(message);
      if (session != null)
@@ -1573,9 +1585,8 @@
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
      logError(message);
      if (session != null)
@@ -2196,15 +2207,8 @@
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
      String threadName = "Replica DS("
          + this.getServerId() + ") heartbeat monitor for domain \""
          + this.baseDn + "\" from RS(" + this.getRsServerId()
          + ") at " + session.getReadableRemoteAddress();
      heartbeatMonitor = new HeartbeatMonitor(
          threadName,
          session,
          heartbeatInterval);
      heartbeatMonitor = new HeartbeatMonitor(getServerId(),
          getRsServerId(), baseDn, session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -2496,7 +2500,11 @@
        reStart(null, true);
      }
      ProtocolSession failingSession = session;
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final ProtocolSession failingSession = session;
      final int replicationServerID = rsServerId;
      try
      {
        ReplicationMsg msg = session.receive();
@@ -2530,11 +2538,12 @@
          /*
           * RS performs a proper disconnection
           */
          Message message =
            NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
            Integer.toString(rsServerId), baseDn.toString(),
            Integer.toString(serverId));
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
              .get(replicationServerID,
                  failingSession.getReadableRemoteAddress(),
                  serverId, baseDn);
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession, true);
        } else if (msg instanceof MonitorMsg)
@@ -2597,12 +2606,10 @@
                {
                  bestServerId = bestServerInfo.getServerId();
                }
                Message message =
                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
                  Integer.toString(serverId),
                  Integer.toString(rsServerId),
                  rsServerUrl,
                  Integer.toString(bestServerId));
                Message message = NOTE_NEW_BEST_REPLICATION_SERVER
                    .get(serverId, replicationServerID,
                        failingSession.getReadableRemoteAddress(),
                        bestServerId, baseDn);
                logError(message);
                reStart(true);
              }
@@ -2632,10 +2639,9 @@
            /*
             * We did not initiate the close on our side, log an error message.
             */
            Message message =
              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
              Integer.toString(rsServerId), baseDn.toString(),
              Integer.toString(serverId));
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
                .get(serverId, baseDn, replicationServerID,
                    failingSession.getReadableRemoteAddress());
            logError(message);
          }
          if (reconnectOnFailure)