| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import org.opends.server.replication.protocol.HeartbeatMonitor; |
| | | |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.HeartbeatMonitor; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplServerStartMsg; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.ServerStartECLMsg; |
| | | import org.opends.server.replication.protocol.ServerStartMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg; |
| | | import org.opends.server.replication.protocol.StartSessionMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.WindowMsg; |
| | | import org.opends.server.replication.protocol.WindowProbeMsg; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | | { |
| | | if (this.baseDn.compareToIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0) |
| | | { |
| | | connectAsECL(); |
| | | } |
| | | else |
| | | { |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Special aspects of connecting as ECL compared to connecting as data server |
| | | * are : |
| | | * - 1 single RS configured |
| | | * - so no choice of the prefered RS |
| | | * - No same groupID polling |
| | | * - ?? Heartbeat |
| | | * - Start handshake is : |
| | | * Broker ---> StartECLMsg ---> RS |
| | | * <---- ReplServerStartMsg --- |
| | | * ---> StartSessionECLMsg --> RS |
| | | */ |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.servers.iterator().next(); |
| | | |
| | | ReplServerStartMsg inReplServerStartMsg |
| | | = performECLPhaseOneHandshake(bestServer, true); |
| | | |
| | | if (inReplServerStartMsg!=null) |
| | | performECLPhaseTwoHandshake(bestServer); |
| | | } |
| | | |
| | | /** |
| | | * Connect to a ReplicationServer. |
| | | * |
| | |
| | | * |
| | | * @throws NumberFormatException address was invalid |
| | | */ |
| | | private void connect() |
| | | private void connectAsDataServer() |
| | | { |
| | | HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>(); |
| | | |
| | |
| | | * Connect to each replication server and get their ServerState then find |
| | | * out which one is the best to connect to. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " + |
| | | " order to elect the prefered one"); |
| | | for (String server : servers) |
| | | { |
| | | // Connect to server and get reply message |
| | |
| | | serverId, baseDn, groupId); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "phase 2 : will perform PhaseOneH with the prefered RS."); |
| | | replServerStartMsg = performPhaseOneHandshake(bestServer, true); |
| | | |
| | | if (replServerStartMsg != null) // Handshake phase 1 exchange went well |
| | |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RB, closing session after phase 1"); |
| | | localSession.close(); |
| | | } catch (IOException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake |
| | | * (start messages exchange) and return the reply message from the replication |
| | | * server. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * Use true if want to perform handshake phase 2 with the same session |
| | | * and keep the session to create as the current one. |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private ReplServerStartMsg performECLPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | ReplServerStartMsg replServerStartMsg = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | ProtocolSession localSession = null; |
| | | |
| | | boolean error = false; |
| | | try |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | if (keepConnection) |
| | | tmpReadableServerName = serverAddr.toString(); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(serverAddr, 500); |
| | | localSession = replSessionSecurity.createClientSession(server, socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | | |
| | | // Send our start msg. |
| | | ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg( |
| | | baseDn, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | | groupId); |
| | | localSession.publish(serverStartECLMsg); |
| | | |
| | | // Read the ReplServerStartMsg that should come back. |
| | | replServerStartMsg = (ReplServerStartMsg) localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() + |
| | | "\nAND RECEIVED:\n" + replServerStartMsg.toString()); |
| | | } |
| | | |
| | | // Sanity check |
| | | String repDn = replServerStartMsg.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | | this.baseDn); |
| | | logError(message); |
| | | error = true; |
| | | } |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | if (keepConnection) |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | localSession.stopEncryption(); |
| | | } |
| | | } catch (ConnectException e) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(message.toString()); |
| | | } |
| | | } |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | if ( (e instanceof SocketTimeoutException) && debugEnabled() ) |
| | | { |
| | | TRACER.debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn); |
| | | } |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(message.toString()); |
| | | } |
| | | error = true; |
| | | } |
| | | |
| | | // Close session if requested |
| | | if (!keepConnection || error) |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RB, closing session after phase 1"); |
| | | localSession.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | localSession = null; |
| | | } |
| | | if (error) |
| | | { |
| | | replServerStartMsg = null; |
| | | } // Be sure to return null. |
| | | |
| | | } |
| | | |
| | | // If this connection as the one to use for sending and receiving updates, |
| | | // store it. |
| | | if (keepConnection) |
| | | { |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerStartMsg; |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | | * replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @param initStatus The status we are starting with |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private TopologyMsg performECLPhaseTwoHandshake(String server) |
| | | { |
| | | TopologyMsg topologyMsg = null; |
| | | |
| | | try |
| | | { |
| | | // Send our Start Session |
| | | StartECLSessionMsg startECLSessionMsg = null; |
| | | startECLSessionMsg = new StartECLSessionMsg(); |
| | | startECLSessionMsg.setOperationId(Short.toString(serverId)); |
| | | session.publish(startECLSessionMsg); |
| | | |
| | | /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ? |
| | | * Read the TopologyMsg that should come back. |
| | | topologyMsg = (TopologyMsg) session.receive(); |
| | | */ |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString()); |
| | | // + "\nAND RECEIVED:\n" + topologyMsg.toString()); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | session.setSoTimeout(timeout); |
| | | connected = true; |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | if (session != null) |
| | | { |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException ex) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | } |
| | | // Be sure to return null. |
| | | topologyMsg = null; |
| | | } |
| | | return topologyMsg; |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | | * replication server. |
| | |
| | | { |
| | | boolean done = false; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() + |
| | | " started."); |
| | | } |
| | | |
| | | while ((!done) && (!sameGroupIdPollershutdown)) |
| | | { |
| | | // Sleep some time between checks |
| | |
| | | */ |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn |
| | | + " received topology info update:\n" + topoMsg); |
| | | |
| | | // Store new lists |
| | | synchronized(getDsList()) |
| | | { |