| | |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplServerStartDSMsg; |
| | | import org.opends.server.replication.protocol.ReplServerStartMsg; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.ServerStartECLMsg; |
| | | import org.opends.server.replication.protocol.ServerStartMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg; |
| | | import org.opends.server.replication.protocol.StartSessionMsg; |
| | | import org.opends.server.replication.protocol.StopMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.WindowMsg; |
| | | import org.opends.server.replication.protocol.WindowProbeMsg; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | |
| | | { |
| | | // Connect to server and get info about it |
| | | ReplicationServerInfo replicationServerInfo = |
| | | performPhaseOneHandshake(server, false); |
| | | performPhaseOneHandshake(server, false, false); |
| | | |
| | | // Store server info in list |
| | | if (replicationServerInfo != null) |
| | |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.servers.iterator().next(); |
| | | |
| | | ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake( |
| | | bestServer, true); |
| | | |
| | | if (inReplServerStartDSMsg != null) |
| | | if (performPhaseOneHandshake(bestServer, true, true) != null) |
| | | { |
| | | performECLPhaseTwoHandshake(bestServer); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | " phase 2 : will perform PhaseOneH with the preferred RS=" |
| | | + replicationServerInfo); |
| | | replicationServerInfo = performPhaseOneHandshake( |
| | | replicationServerInfo.getServerURL(), true); |
| | | replicationServerInfo.getServerURL(), true, false); |
| | | |
| | | if (replicationServerInfo != null) |
| | | { |
| | |
| | | * Do we keep session opened or not after handshake. Use true if want |
| | | * to perform handshake phase 2 with the same session and keep the |
| | | * session to create as the current one. |
| | | * @param isECL |
| | | * Indicates whether or not the an ECL handshake is to be performed. |
| | | * @return The answer from the server . Null if could not get an answer. |
| | | */ |
| | | private ReplicationServerInfo performPhaseOneHandshake( |
| | | String server, boolean keepConnection) |
| | | String server, boolean keepConnection, boolean isECL) |
| | | { |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | |
| | | .isSslEncryption(server); |
| | | |
| | | // Send our ServerStartMsg. |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, |
| | | baseDn, maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(serverId, baseDn, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | } |
| | | localSession.publish(serverStartMsg); |
| | | |
| | | // Read the ReplServerStartMsg or ReplServerStartDSMsg that should |
| | |
| | | * replication server will use the same one (or an older one if it is an |
| | | * old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo |
| | | .getProtocolVersion()); |
| | | localSession.setProtocolVersion(protocolVersion); |
| | | final short localProtocolVersion = ProtocolVersion |
| | | .minWithCurrent(replServerInfo.getProtocolVersion()); |
| | | if (keepConnection) |
| | | { |
| | | protocolVersion = localProtocolVersion; |
| | | } |
| | | localSession.setProtocolVersion(localProtocolVersion); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake |
| | | * (start messages exchange) and return the reply message from the replication |
| | | * server. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * Use true if want to perform handshake phase 2 with the same session |
| | | * and keep the session to create as the current one. |
| | | * @return The ReplServerStartDSMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private ReplServerStartDSMsg performECLPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | // FIXME: this should be merged with performPhaseOneHandshake to avoid |
| | | // code/bug duplication. |
| | | ReplServerStartDSMsg replServerStartDSMsg = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | ProtocolSession localSession = null; |
| | | |
| | | boolean error = false; |
| | | try |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(serverAddr, 500); |
| | | localSession = replSessionSecurity.createClientSession(socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | | |
| | | // Send our start msg. |
| | | ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg( |
| | | baseDn, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | | groupId); |
| | | localSession.publish(serverStartECLMsg); |
| | | |
| | | // Read the ReplServerStartMsg that should come back. |
| | | replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() + |
| | | "\nAND RECEIVED:\n" + replServerStartDSMsg.toString()); |
| | | } |
| | | |
| | | // Sanity check |
| | | String repDn = replServerStartDSMsg.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | | this.baseDn); |
| | | logError(message); |
| | | error = true; |
| | | } |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | if (keepConnection) |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartDSMsg.getVersion()); |
| | | localSession.setProtocolVersion(protocolVersion); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | localSession.stopEncryption(); |
| | | } |
| | | } catch (ConnectException e) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId, |
| | | server, baseDn); |
| | | |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | debugInfo(message.toString()); |
| | | } |
| | | } |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | if ((e instanceof SocketTimeoutException) && debugEnabled()) |
| | | { |
| | | debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn); |
| | | } |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | debugInfo(message.toString()); |
| | | } |
| | | error = true; |
| | | } |
| | | |
| | | // Close session if requested |
| | | if (!keepConnection || error) |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | if (debugEnabled()) { |
| | | debugInfo("In RB, closing session after phase 1"); |
| | | } |
| | | localSession.close(); |
| | | localSession = null; |
| | | } |
| | | if (error) |
| | | { |
| | | replServerStartDSMsg = null; |
| | | } // Be sure to return null. |
| | | |
| | | } |
| | | |
| | | // If this connection as the one to use for sending and receiving updates, |
| | | // store it. |
| | | if (keepConnection) |
| | | { |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerStartDSMsg; |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |