| | |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | |
| | | import org.opends.server.replication.protocol.HeartbeatMonitor; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplServerStartDSMsg; |
| | | import org.opends.server.replication.protocol.ReplServerStartMsg; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | |
| | | import org.opends.server.replication.protocol.ServerStartMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg; |
| | | import org.opends.server.replication.protocol.StartSessionMsg; |
| | | import org.opends.server.replication.protocol.StopMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.WindowMsg; |
| | |
| | | // Our replication domain |
| | | private ReplicationDomain domain = null; |
| | | |
| | | // Trick for avoiding a inner class for many parameters return for |
| | | // performPhaseOneHandshake method. |
| | | private String tmpReadableServerName = null; |
| | | /** |
| | | * The expected duration in milliseconds between heartbeats received |
| | | * from the replication server. Zero means heartbeats are off. |
| | |
| | | * @param groupId The group id of our domain. |
| | | * @param changeTimeHeartbeatInterval The interval (in ms) between Change |
| | | * time heartbeats are sent to the RS, |
| | | * or zero if no CN heartbeat shoud be sent. |
| | | * or zero if no CN heartbeat should be sent. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, String baseDn, int serverID2, int window, |
| | |
| | | |
| | | /** |
| | | * Bag class for keeping info we get from a server in order to compute the |
| | | * best one to connect to. |
| | | * best one to connect to. This is in fact a wrapper to a |
| | | * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). |
| | | */ |
| | | public static class ServerInfo |
| | | { |
| | | |
| | | private ServerState serverState = null; |
| | | private short protocolVersion; |
| | | private long generationId; |
| | | private byte groupId = (byte) -1; |
| | | private int serverId; |
| | | private String serverURL; |
| | | private String baseDn = null; |
| | | private int windowSize; |
| | | private ServerState serverState; |
| | | private boolean sslEncryption; |
| | | private int degradedStatusThreshold = -1; |
| | | // Keeps the -1 value if created with a ReplServerStartMsg |
| | | private int weight = -1; |
| | | // Keeps the -1 value if created with a ReplServerStartMsg |
| | | private int connectedDSNumber = -1; |
| | | |
| | | /** |
| | | * Constructor. |
| | | * @param serverState Server state of the RS |
| | | * @param groupId Group id of the RS |
| | | * Create a new instance of ServerInfo wrapping the passed message. |
| | | * @param msg Message to wrap. |
| | | * @return The new instance wrapping the passed message. |
| | | * @throws IllegalArgumentException If the passed message has an unexpected |
| | | * type. |
| | | */ |
| | | public ServerInfo(ServerState serverState, byte groupId) |
| | | public static ServerInfo newServerInfo( |
| | | ReplicationMsg msg) throws IllegalArgumentException |
| | | { |
| | | this.serverState = serverState; |
| | | this.groupId = groupId; |
| | | if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | // This is a ReplServerStartMsg (RS uses protocol V3 or under) |
| | | ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg; |
| | | return new ServerInfo(replServerStartMsg); |
| | | } |
| | | else if (msg instanceof ReplServerStartDSMsg) |
| | | { |
| | | // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher) |
| | | ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg; |
| | | return new ServerInfo(replServerStartDSMsg); |
| | | } |
| | | |
| | | // Unsupported message type: should not happen |
| | | throw new IllegalArgumentException("Unexpected PDU type: " + |
| | | msg.getClass().getName() + " :\n" + msg.toString()); |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ServerInfo object wrapping a ReplServerStartMsg. |
| | | * @param replServerStartMsg The ReplServerStartMsg this object will wrap. |
| | | */ |
| | | private ServerInfo(ReplServerStartMsg replServerStartMsg) |
| | | { |
| | | this.protocolVersion = replServerStartMsg.getVersion(); |
| | | this.generationId = replServerStartMsg.getGenerationId(); |
| | | this.groupId = replServerStartMsg.getGroupId(); |
| | | this.serverId = replServerStartMsg.getServerId(); |
| | | this.serverURL = replServerStartMsg.getServerURL(); |
| | | this.baseDn = replServerStartMsg.getBaseDn(); |
| | | this.windowSize = replServerStartMsg.getWindowSize(); |
| | | this.serverState = replServerStartMsg.getServerState(); |
| | | this.sslEncryption = replServerStartMsg.getSSLEncryption(); |
| | | this.degradedStatusThreshold = |
| | | replServerStartMsg.getDegradedStatusThreshold(); |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg. |
| | | * @param replServerStartDSMsg The ReplServerStartDSMsg this object will |
| | | * wrap. |
| | | */ |
| | | private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg) |
| | | { |
| | | this.protocolVersion = replServerStartDSMsg.getVersion(); |
| | | this.generationId = replServerStartDSMsg.getGenerationId(); |
| | | this.groupId = replServerStartDSMsg.getGroupId(); |
| | | this.serverId = replServerStartDSMsg.getServerId(); |
| | | this.serverURL = replServerStartDSMsg.getServerURL(); |
| | | this.baseDn = replServerStartDSMsg.getBaseDn(); |
| | | this.windowSize = replServerStartDSMsg.getWindowSize(); |
| | | this.serverState = replServerStartDSMsg.getServerState(); |
| | | this.sslEncryption = replServerStartDSMsg.getSSLEncryption(); |
| | | this.degradedStatusThreshold = |
| | | replServerStartDSMsg.getDegradedStatusThreshold(); |
| | | this.weight = replServerStartDSMsg.getWeight(); |
| | | this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return groupId; |
| | | } |
| | | |
| | | /** |
| | | * Get the server protocol version. |
| | | * @return the protocolVersion |
| | | */ |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | |
| | | /** |
| | | * Get the generation id. |
| | | * @return the generationId |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the server id. |
| | | * @return the serverId |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Get the server URL. |
| | | * @return the serverURL |
| | | */ |
| | | public String getServerURL() |
| | | { |
| | | return serverURL; |
| | | } |
| | | |
| | | /** |
| | | * Get the base dn. |
| | | * @return the baseDn |
| | | */ |
| | | public String getBaseDn() |
| | | { |
| | | return baseDn; |
| | | } |
| | | |
| | | /** |
| | | * Get the window size. |
| | | * @return the windowSize |
| | | */ |
| | | public int getWindowSize() |
| | | { |
| | | return windowSize; |
| | | } |
| | | |
| | | /** |
| | | * Get the ssl encryption. |
| | | * @return the sslEncryption |
| | | */ |
| | | public boolean isSslEncryption() |
| | | { |
| | | return sslEncryption; |
| | | } |
| | | |
| | | /** |
| | | * Get the degraded status threshold. |
| | | * @return the degradedStatusThreshold |
| | | */ |
| | | public int getDegradedStatusThreshold() |
| | | { |
| | | return degradedStatusThreshold; |
| | | } |
| | | |
| | | /** |
| | | * Get the weight. |
| | | * @return the weight. Null if this object is a wrapper for |
| | | * a ReplServerStartMsg. |
| | | */ |
| | | public int getWeight() |
| | | { |
| | | return weight; |
| | | } |
| | | |
| | | /** |
| | | * Get the connected DS number. |
| | | * @return the connectedDSNumber. Null if this object is a wrapper for |
| | | * a ReplServerStartMsg. |
| | | */ |
| | | public int getConnectedDSNumber() |
| | | { |
| | | return connectedDSNumber; |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | |
| | | } |
| | | |
| | | /** |
| | | * Contacts all replication servers to get information from them and being |
| | | * able to choose the more suitable. |
| | | * @return the collected information. |
| | | */ |
| | | private Map<String, ServerInfo> collectReplicationServersInfo() { |
| | | |
| | | Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>(); |
| | | |
| | | for (String server : servers) |
| | | { |
| | | // Connect to server and get info about it |
| | | ServerInfo serverInfo = performPhaseOneHandshake(server, false); |
| | | |
| | | // Store server info in list |
| | | if (serverInfo != null) |
| | | { |
| | | rsInfos.put(server, serverInfo); |
| | | } |
| | | } |
| | | |
| | | return rsInfos; |
| | | } |
| | | |
| | | /** |
| | | * Special aspects of connecting as ECL compared to connecting as data server |
| | | * are : |
| | | * - 1 single RS configured |
| | | * - so no choice of the prefered RS |
| | | * - so no choice of the preferred RS |
| | | * - No same groupID polling |
| | | * - ?? Heartbeat |
| | | * - Start handshake is : |
| | |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.servers.iterator().next(); |
| | | |
| | | ReplServerStartMsg inReplServerStartMsg |
| | | ReplServerStartDSMsg inReplServerStartDSMsg |
| | | = performECLPhaseOneHandshake(bestServer, true); |
| | | |
| | | if (inReplServerStartMsg!=null) |
| | | if (inReplServerStartDSMsg!=null) |
| | | performECLPhaseTwoHandshake(bestServer); |
| | | } |
| | | |
| | |
| | | */ |
| | | private void connectAsDataServer() |
| | | { |
| | | HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>(); |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (domain != null) |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " + |
| | | " order to elect the prefered one"); |
| | | for (String server : servers) |
| | | { |
| | | // Connect to server and get reply message |
| | | ReplServerStartMsg replServerStartMsg = |
| | | performPhaseOneHandshake(server, false); |
| | | " order to elect the preferred one"); |
| | | |
| | | // Store reply message info in list |
| | | if (replServerStartMsg != null) |
| | | { |
| | | ServerInfo serverInfo = |
| | | new ServerInfo(replServerStartMsg.getServerState(), |
| | | replServerStartMsg.getGroupId()); |
| | | rsInfos.put(server, serverInfo); |
| | | } |
| | | } // for servers |
| | | // Get info from every available replication servers |
| | | Map<String, ServerInfo> rsInfos = collectReplicationServersInfo(); |
| | | |
| | | ReplServerStartMsg replServerStartMsg = null; |
| | | ServerInfo serverInfo = null; |
| | | |
| | | if (rsInfos.size() > 0) |
| | | { |
| | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "phase 2 : will perform PhaseOneH with the prefered RS."); |
| | | replServerStartMsg = performPhaseOneHandshake(bestServer, true); |
| | | "phase 2 : will perform PhaseOneH with the preferred RS."); |
| | | serverInfo = performPhaseOneHandshake(bestServer, true); |
| | | |
| | | if (replServerStartMsg != null) // Handshake phase 1 exchange went well |
| | | if (serverInfo != null) // Handshake phase 1 exchange went well |
| | | |
| | | { |
| | | ServerInfo bestServerInfo = rsInfos.get(bestServer); |
| | | |
| | | // Compute in which status we are starting the session to tell the RS |
| | | ServerStatus initStatus = |
| | | computeInitialServerStatus(replServerStartMsg.getGenerationId(), |
| | | bestServerInfo.getServerState(), |
| | | replServerStartMsg.getDegradedStatusThreshold(), |
| | | computeInitialServerStatus(serverInfo.getGenerationId(), |
| | | serverInfo.getServerState(), |
| | | serverInfo.getDegradedStatusThreshold(), |
| | | this.getGenerationID()); |
| | | |
| | | // Perfom session start (handshake phase 2) |
| | |
| | | * reconnection at that time to retrieve a server with our group |
| | | * id. |
| | | */ |
| | | byte tmpRsGroupId = bestServerInfo.getGroupId(); |
| | | byte tmpRsGroupId = serverInfo.getGroupId(); |
| | | boolean someServersWithSameGroupId = |
| | | hasSomeServerWithSameGroupId(topologyMsg.getRsList()); |
| | | |
| | |
| | | if ((tmpRsGroupId == groupId) || |
| | | ((tmpRsGroupId != groupId) && !someServersWithSameGroupId)) |
| | | { |
| | | replicationServer = tmpReadableServerName; |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | rsGroupId = replServerStartMsg.getGroupId(); |
| | | rsServerId = replServerStartMsg.getServerId(); |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | | maxSendWindow = serverInfo.getWindowSize(); |
| | | rsGroupId = serverInfo.getGroupId(); |
| | | rsServerId = serverInfo.getServerId(); |
| | | rsServerUrl = bestServer; |
| | | |
| | | // May have created a broker with null replication domain for |
| | |
| | | if (domain != null) |
| | | { |
| | | domain.sessionInitiated( |
| | | initStatus, replServerStartMsg.getServerState(), |
| | | replServerStartMsg.getGenerationId(), |
| | | initStatus, serverInfo.getServerState(), |
| | | serverInfo.getGenerationId(), |
| | | session); |
| | | } |
| | | receiveTopo(topologyMsg); |
| | |
| | | startSameGroupIdPoller(); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | | if (replServerStartMsg.getVersion() |
| | | if (serverInfo.getProtocolVersion() |
| | | >= ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | startChangeTimeHeartBeatPublishing(); |
| | |
| | | rcvWindow = maxRcvWindow; |
| | | connectPhaseLock.notify(); |
| | | |
| | | if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) || |
| | | (replServerStartMsg.getGenerationId() == -1)) |
| | | if ((serverInfo.getGenerationId() == this.getGenerationID()) || |
| | | (serverInfo.getGenerationId() == -1)) |
| | | { |
| | | Message message = |
| | | NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.getGenerationID()), |
| | | Long.toString(replServerStartMsg.getGenerationId())); |
| | | Long.toString(serverInfo.getGenerationId())); |
| | | logError(message); |
| | | } |
| | | } else |
| | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake |
| | | * (start messages exchange) and return the reply message from the replication |
| | | * server. |
| | | * server, wrapped in a ServerInfo object. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * Use true if want to perform handshake phase 2 with the same session |
| | | * and keep the session to create as the current one. |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * @return The answer from the server . Null if could not |
| | | * get an answer. |
| | | */ |
| | | private ReplServerStartMsg performPhaseOneHandshake(String server, |
| | | private ServerInfo performPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | ReplServerStartMsg replServerStartMsg = null; |
| | | ServerInfo serverInfo = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | if (keepConnection) |
| | | tmpReadableServerName = serverAddr.toString(); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | |
| | | localSession.publish(serverStartMsg); |
| | | |
| | | /* |
| | | * Read the ReplServerStartMsg that should come back. |
| | | * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come |
| | | * back. |
| | | */ |
| | | replServerStartMsg = (ReplServerStartMsg) localSession.receive(); |
| | | ReplicationMsg msg = localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + replServerStartMsg.toString()); |
| | | } |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + msg.toString()); |
| | | } |
| | | |
| | | // Wrap received message in a server info object |
| | | serverInfo = ServerInfo.newServerInfo(msg); |
| | | |
| | | // Sanity check |
| | | String repDn = replServerStartMsg.getBaseDn(); |
| | | String repDn = serverInfo.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | serverInfo.getProtocolVersion()); |
| | | localSession.setProtocolVersion(protocolVersion); |
| | | |
| | | |
| | |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RB, closing session after phase 1"); |
| | | |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end communications |
| | | if (!error) |
| | | { |
| | | try |
| | | { |
| | | localSession.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | } |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RB, closing session after phase 1"); |
| | | localSession.close(); |
| | | } catch (IOException e) |
| | | { |
| | |
| | | } |
| | | if (error) |
| | | { |
| | | replServerStartMsg = null; |
| | | serverInfo = null; |
| | | } // Be sure to return null. |
| | | |
| | | } |
| | |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerStartMsg; |
| | | return serverInfo; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * Use true if want to perform handshake phase 2 with the same session |
| | | * and keep the session to create as the current one. |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * @return The ReplServerStartDSMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private ReplServerStartMsg performECLPhaseOneHandshake(String server, |
| | | private ReplServerStartDSMsg performECLPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | ReplServerStartMsg replServerStartMsg = null; |
| | | ReplServerStartDSMsg replServerStartDSMsg = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | if (keepConnection) |
| | | tmpReadableServerName = serverAddr.toString(); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | |
| | | localSession.publish(serverStartECLMsg); |
| | | |
| | | // Read the ReplServerStartMsg that should come back. |
| | | replServerStartMsg = (ReplServerStartMsg) localSession.receive(); |
| | | replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() + |
| | | "\nAND RECEIVED:\n" + replServerStartMsg.toString()); |
| | | "\nAND RECEIVED:\n" + replServerStartDSMsg.toString()); |
| | | } |
| | | |
| | | // Sanity check |
| | | String repDn = replServerStartMsg.getBaseDn(); |
| | | String repDn = replServerStartDSMsg.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | |
| | | */ |
| | | if (keepConnection) |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | replServerStartDSMsg.getVersion()); |
| | | localSession.setProtocolVersion(protocolVersion); |
| | | |
| | | if (!isSslEncryption) |
| | |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RB, closing session after phase 1"); |
| | | |
| | | // V4 protocol introduces a StopMsg to properly end communications |
| | | if (!error) |
| | | { |
| | | try |
| | | { |
| | | localSession.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RB, closing session after phase 1"); |
| | | localSession.close(); |
| | | } catch (IOException e) |
| | | { |
| | |
| | | } |
| | | if (error) |
| | | { |
| | | replServerStartMsg = null; |
| | | replServerStartDSMsg = null; |
| | | } // Be sure to return null. |
| | | |
| | | } |
| | |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerStartMsg; |
| | | return replServerStartDSMsg; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The computed best replication server. |
| | | */ |
| | | public static String computeBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn, |
| | | byte groupId) |
| | | Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId) |
| | | { |
| | | /* |
| | | * Preference is given to servers with the requested group id: |
| | |
| | | */ |
| | | |
| | | // Filter for servers with same group id |
| | | HashMap<String, ServerInfo> sameGroupIdRsInfos = |
| | | Map<String, ServerInfo> sameGroupIdRsInfos = |
| | | new HashMap<String, ServerInfo>(); |
| | | |
| | | for (String repServer : rsInfos.keySet()) |
| | |
| | | * @return The computed best replication server. |
| | | */ |
| | | private static String searchForBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn) |
| | | Map<String, ServerInfo> rsInfos, int serverId2, String baseDn) |
| | | { |
| | | /* |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | |
| | | HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>(); |
| | | |
| | | /* |
| | | * Start loop to differenciate up to date servers from late ones. |
| | | * Start loop to differentiate up to date servers from late ones. |
| | | */ |
| | | ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2); |
| | | if (myChangeNumber == null) |
| | |
| | | if (ReplicationServer.isLocalReplicationServer(upServer)) |
| | | { |
| | | localRS = true; |
| | | break; |
| | | } |
| | | } |
| | | if (localRS) |
| | |
| | | new HeartbeatMonitor("Replication Heartbeat Monitor on RS " + |
| | | getReplicationServer() + " " + rsServerId + " for " + baseDn + |
| | | " in DS " + serverId, |
| | | session, heartbeatInterval); |
| | | session, heartbeatInterval, (protocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4)); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void reStart(ProtocolSession failingSession) |
| | | { |
| | | try |
| | | |
| | | if (failingSession != null) |
| | | { |
| | | if (failingSession != null) |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end communications |
| | | try |
| | | { |
| | | failingSession.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | try |
| | | { |
| | | failingSession.close(); |
| | | numLostConnections++; |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | } |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | numLostConnections++; |
| | | } |
| | | |
| | | if (failingSession == session) |
| | |
| | | TopologyMsg topoMsg = (TopologyMsg)msg; |
| | | receiveTopo(topoMsg); |
| | | } |
| | | else if (msg instanceof StopMsg) |
| | | { |
| | | /* |
| | | * RS performs a proper disconnection |
| | | */ |
| | | Message message = |
| | | NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer, |
| | | Integer.toString(rsServerId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | // Try to find a suitable RS |
| | | this.reStart(failingSession); |
| | | } |
| | | else |
| | | { |
| | | return msg; |
| | |
| | | |
| | | { |
| | | /* |
| | | * If we did not initiate the close on our side, log a message. |
| | | * We did not initiate the close on our side, log an error message. |
| | | */ |
| | | Message message = |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer, |
| | | ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer, |
| | | Integer.toString(rsServerId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | |
| | | rsGroupId = (byte) -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | try |
| | | |
| | | if (session != null) |
| | | { |
| | | if (session != null) |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end communications |
| | | try |
| | | { |
| | | session.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | } |
| | | |
| | |
| | | Collection<String> replicationServers, int window, long heartbeatInterval, |
| | | byte groupId) |
| | | { |
| | | // These parameters needs to be renegociated with the ReplicationServer |
| | | // These parameters needs to be renegotiated with the ReplicationServer |
| | | // so if they have changed, that requires restarting the session with |
| | | // the ReplicationServer. |
| | | Boolean needToRestartSession = false; |
| | |
| | | |
| | | private boolean debugEnabled() |
| | | { |
| | | return true; |
| | | return false; |
| | | } |
| | | |
| | | private static final void debugInfo(String s) |
| | |
| | | continue; |
| | | |
| | | // Connect to server and get reply message |
| | | ReplServerStartMsg replServerStartMsg = |
| | | ServerInfo serverInfo = |
| | | performPhaseOneHandshake(server, false); |
| | | |
| | | // Store reply message info in list |
| | | if (replServerStartMsg != null) |
| | | // Is it a server with our group id ? |
| | | if (serverInfo != null) |
| | | { |
| | | if (groupId == replServerStartMsg.getGroupId()) |
| | | if (groupId == serverInfo.getGroupId()) |
| | | { |
| | | // Found one server with the same group id as us, disconnect |
| | | // session to force reconnection to a server with same group |
| | |
| | | Byte.toString(groupId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | |
| | | if (protocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end |
| | | // communications |
| | | try |
| | | { |
| | | session.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | try |
| | | { |
| | | session.close(); |