| | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.HostPort; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(); |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | | { |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | | } |
| | | else |
| | | { |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Contacts all replication servers to get information from them and being |
| | | * able to choose the more suitable. |
| | |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | // Connect to server + get and store info about it |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false); |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false); |
| | | final ReplicationServerInfo rsInfo = rs.rsInfo; |
| | | if (rsInfo != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Special aspects of connecting as ECL (External Change Log) compared to |
| | | * connecting as data server are : |
| | | * <ul> |
| | | * <li>1 single RS configured</li> |
| | | * <li>so no choice of the preferred RS</li> |
| | | * <li>?? Heartbeat</li> |
| | | * <li>Start handshake is : |
| | | * |
| | | * <pre> |
| | | * Broker ---> StartECLMsg ---> RS |
| | | * <---- ReplServerStartMsg --- |
| | | * ---> StartSessionECLMsg --> RS |
| | | * </pre> |
| | | * |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | final String bestServerURL = getReplicationServerUrls().iterator().next(); |
| | | final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true); |
| | | if (rs.isConnected()) |
| | | { |
| | | performECLPhaseTwoHandshake(bestServerURL, rs); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Connect to a ReplicationServer. |
| | | * |
| | | * Handshake sequences between a DS and a RS is divided into 2 logical |
| | |
| | | + evals.getBestRS()); |
| | | |
| | | final ConnectedRS electedRS = performPhaseOneHandshake( |
| | | evals.getBestRS().getServerURL(), true, false); |
| | | evals.getBestRS().getServerURL(), true); |
| | | final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; |
| | | if (electedRsInfo != null) |
| | | { |
| | |
| | | * Do we keep session opened or not after handshake. Use true if want |
| | | * to perform handshake phase 2 with the same session and keep the |
| | | * session to create as the current one. |
| | | * @param isECL |
| | | * Indicates whether or not the an ECL handshake is to be performed. |
| | | * @return The answer from the server . Null if could not get an answer. |
| | | */ |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, |
| | | boolean keepSession, boolean isECL) |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession) |
| | | { |
| | | Session newSession = null; |
| | | Socket socket = null; |
| | |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), |
| | | timeoutMS); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS); |
| | | newSession = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | boolean isSslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | |
| | | final HostPort hp = new HostPort( |
| | | socket.getLocalAddress().getHostName(), socket.getLocalPort()); |
| | | final String url = hp.toString(); |
| | | final StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | newSession.publish(serverStartMsg); |
| | | |
| | | // Read the ReplServerStartMsg or ReplServerStartDSMsg that should |
| | |
| | | return setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Performs the second phase handshake for External Change Log (send |
| | | * StartSessionMsg and receive TopologyMsg messages exchange) and return the |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | */ |
| | | private void performECLPhaseTwoHandshake(String server, ConnectedRS rs) |
| | | { |
| | | try |
| | | { |
| | | // Send our Start Session |
| | | final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | rs.session.publish(startECLSessionMsg); |
| | | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | rs.session.setSoTimeout(timeout); |
| | | setConnectedRS(rs); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e))); |
| | | |
| | | rs.session.close(); |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | |
| | | |
| | | try |
| | | { |
| | | connect(); |
| | | connectAsDataServer(); |
| | | rs = connectedRS.get(); |
| | | } |
| | | catch (Exception e) |
| | |
| | | Map<Integer, ReplicationServerInfo> previousRsInfos) |
| | | { |
| | | this.rsServerId = rsServerId; |
| | | this.replicaInfos = dsInfosToKeep; |
| | | this.replicaInfos = dsInfosToKeep == null |
| | | ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep; |
| | | this.rsInfos = computeRSInfos(dsServerId, newRSInfos, |
| | | previousRsInfos, configuredReplicationServerUrls); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos |
| | | return getClass().getSimpleName() |
| | | + " rsServerId=" + rsServerId |
| | | + ", replicaInfos=" + replicaInfos.values() |
| | | + ", rsInfos=" + rsInfos.values(); |
| | | } |
| | | } |