| | |
| | | /** |
| | | * Immutable class containing information about whether the broker is |
| | | * connected to an RS and data associated to this connected RS. |
| | | * <p> |
| | | * Mutable methods return a new version of this object copying the data that |
| | | * did not change. |
| | | */ |
| | | // @Immutable |
| | | private static final class ConnectedRS |
| | | { |
| | | |
| | | private final String replicationServer; |
| | | private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS( |
| | | NO_CONNECTED_SERVER); |
| | | |
| | | /** The info of the RS we are connected to. */ |
| | | private final ReplicationServerInfo rsInfo; |
| | | private final boolean connected; |
| | | private final Session session; |
| | | private final String replicationServer; |
| | | |
| | | private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo, |
| | | String replicationServer) |
| | | private ConnectedRS(String replicationServer) |
| | | { |
| | | this.connected = connected; |
| | | this.rsInfo = rsInfo; |
| | | this.rsInfo = null; |
| | | this.session = null; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | private ConnectedRS(ReplicationServerInfo rsInfo, Session session) |
| | | { |
| | | this.rsInfo = rsInfo; |
| | | this.session = session; |
| | | this.replicationServer = session != null ? |
| | | session.getReadableRemoteAddress() |
| | | : NO_CONNECTED_SERVER; |
| | | } |
| | | |
| | | private static ConnectedRS stopped() |
| | | { |
| | | return new ConnectedRS(false, null, "stopped"); |
| | | return new ConnectedRS("stopped"); |
| | | } |
| | | |
| | | private static ConnectedRS noConnectedRS() |
| | | { |
| | | return new ConnectedRS(false, null, NO_CONNECTED_SERVER); |
| | | } |
| | | |
| | | /** |
| | | * Returns a new version of the current object with the connected status set |
| | | * to true. |
| | | */ |
| | | private ConnectedRS setConnected() |
| | | { |
| | | return new ConnectedRS(true, rsInfo, replicationServer); |
| | | return NO_CONNECTED_RS; |
| | | } |
| | | |
| | | public int getServerId() |
| | |
| | | return rsInfo != null ? rsInfo.getGroupId() : -1; |
| | | } |
| | | |
| | | private boolean isConnected() |
| | | { |
| | | return session != null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | |
| | | |
| | | public void toString(StringBuilder sb) |
| | | { |
| | | sb.append("connected=").append(connected).append(", "); |
| | | if (rsInfo == null) // this is a null object |
| | | sb.append("connected=").append(isConnected()).append(", "); |
| | | if (!isConnected()) |
| | | { |
| | | sb.append("no connected RS"); |
| | | sb.append("no connectedRS"); |
| | | } |
| | | else |
| | | { |
| | | sb.append("connected RS(serverId=").append(rsInfo.getServerId()) |
| | | sb.append("connectedRS(serverId=").append(rsInfo.getServerId()) |
| | | .append(", serverUrl=").append(rsInfo.getServerURL()) |
| | | .append(", groupId=").append(rsInfo.getGroupId()) |
| | | .append(")"); |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | * String reported under CSN=monitor when there is no connected RS. |
| | | */ |
| | | public final static String NO_CONNECTED_SERVER = "Not connected"; |
| | | private volatile Session session; |
| | | private final ServerState state; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow = 100; |
| | | private int halfRcvWindow = rcvWindow / 2; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | /** |
| | | * The RS this DS is currently connected to. |
| | | * <p> |
| | | * Always use {@link #setConnectedRS(ConnectedRS)} to set a new |
| | | * connected RS. |
| | | */ |
| | | // @NotNull // for the reference |
| | | private final AtomicReference<ConnectedRS> connectedRS = |
| | | new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS()); |
| | | /** Our replication domain. */ |
| | | /** |
| | | * Our replication domain. |
| | | * <p> |
| | | * Can be null for unit test purpose. |
| | | */ |
| | | private ReplicationDomain domain; |
| | | /** |
| | | * This object is used as a conditional event to be notified about |
| | |
| | | private int mustRunBestServerCheckingAlgorithm = 0; |
| | | |
| | | /** |
| | | * The monitor provider for this replication domain. The name of the monitor |
| | | * includes the local address and must therefore be re-registered every time |
| | | * the session is re-established or destroyed. The monitor provider can only |
| | | * be created (i.e. non-null) if there is a replication domain, which is not |
| | | * the case in unit tests. |
| | | * The monitor provider for this replication domain. |
| | | * <p> |
| | | * The name of the monitor includes the local address and must therefore be |
| | | * re-registered every time the session is re-established or destroyed. The |
| | | * monitor provider can only be created (i.e. non-null) if there is a |
| | | * replication domain, which is not the case in unit tests. |
| | | */ |
| | | private final ReplicationMonitor monitor; |
| | | |
| | |
| | | this.domain = replicationDomain; |
| | | this.state = state; |
| | | this.config = config; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | this.generationID = generationId; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(connectedRS.get()); |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | // This RS is locally configured, mark this |
| | | rsInfo.setLocallyConfigured(true); |
| | | rsInfo.serverURL = serverUrl; |
| | | rsInfo.setServerURL(serverUrl); |
| | | return; |
| | | } |
| | | } |
| | |
| | | */ |
| | | public static class ReplicationServerInfo |
| | | { |
| | | private RSInfo rsInfo; |
| | | private short protocolVersion; |
| | | private long generationId; |
| | | private byte groupId = -1; |
| | | private int serverId; |
| | | /** Received server URL. */ |
| | | private String serverURL; |
| | | private DN baseDN; |
| | | private int windowSize; |
| | | private ServerState serverState; |
| | | private boolean sslEncryption; |
| | | private int degradedStatusThreshold = -1; |
| | | /** Keeps the 1 value if created with a ReplServerStartMsg. */ |
| | | private int weight = 1; |
| | | private final int degradedStatusThreshold; |
| | | /** Keeps the 0 value if created with a ReplServerStartMsg. */ |
| | | private int connectedDSNumber = 0; |
| | | private List<Integer> connectedDSs; |
| | | private Set<Integer> connectedDSs; |
| | | /** |
| | | * Is this RS locally configured? (the RS is recognized as a usable server). |
| | | */ |
| | |
| | | public static ReplicationServerInfo newInstance( |
| | | ReplicationMsg msg, String newServerURL) throws IllegalArgumentException |
| | | { |
| | | ReplicationServerInfo rsInfo = newInstance(msg); |
| | | rsInfo.serverURL = newServerURL; |
| | | final ReplicationServerInfo rsInfo = newInstance(msg); |
| | | rsInfo.setServerURL(newServerURL); |
| | | return rsInfo; |
| | | } |
| | | |
| | |
| | | * @throws IllegalArgumentException If the passed message has an unexpected |
| | | * type. |
| | | */ |
| | | public static ReplicationServerInfo newInstance( |
| | | ReplicationMsg msg) throws IllegalArgumentException |
| | | public static ReplicationServerInfo newInstance(ReplicationMsg msg) |
| | | throws IllegalArgumentException |
| | | { |
| | | if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | // This is a ReplServerStartMsg (RS uses protocol V3 or under) |
| | | ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg; |
| | | return new ReplicationServerInfo(replServerStartMsg); |
| | | } else if (msg instanceof ReplServerStartDSMsg) |
| | | // RS uses protocol V3 or lower |
| | | return new ReplicationServerInfo((ReplServerStartMsg) msg); |
| | | } |
| | | else if (msg instanceof ReplServerStartDSMsg) |
| | | { |
| | | // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher) |
| | | ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg; |
| | | return new ReplicationServerInfo(replServerStartDSMsg); |
| | | // RS uses protocol V4 or higher |
| | | return new ReplicationServerInfo((ReplServerStartDSMsg) msg); |
| | | } |
| | | |
| | | // Unsupported message type: should not happen |
| | | throw new IllegalArgumentException("Unexpected PDU type: " + |
| | | msg.getClass().getName() + " :\n" + msg); |
| | | throw new IllegalArgumentException("Unexpected PDU type: " |
| | | + msg.getClass().getName() + " :\n" + msg); |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ReplicationServerInfo object wrapping a |
| | | * {@link ReplServerStartMsg}. |
| | | * |
| | | * @param replServerStartMsg |
| | | * @param msg |
| | | * The {@link ReplServerStartMsg} this object will wrap. |
| | | */ |
| | | private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg) |
| | | private ReplicationServerInfo(ReplServerStartMsg msg) |
| | | { |
| | | this.protocolVersion = replServerStartMsg.getVersion(); |
| | | this.generationId = replServerStartMsg.getGenerationId(); |
| | | this.groupId = replServerStartMsg.getGroupId(); |
| | | this.serverId = replServerStartMsg.getServerId(); |
| | | this.serverURL = replServerStartMsg.getServerURL(); |
| | | this.baseDN = replServerStartMsg.getBaseDN(); |
| | | this.windowSize = replServerStartMsg.getWindowSize(); |
| | | this.serverState = replServerStartMsg.getServerState(); |
| | | this.sslEncryption = replServerStartMsg.getSSLEncryption(); |
| | | this.degradedStatusThreshold = |
| | | replServerStartMsg.getDegradedStatusThreshold(); |
| | | this.protocolVersion = msg.getVersion(); |
| | | this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), |
| | | msg.getGenerationId(), msg.getGroupId(), 1); |
| | | this.baseDN = msg.getBaseDN(); |
| | | this.windowSize = msg.getWindowSize(); |
| | | this.serverState = msg.getServerState(); |
| | | this.sslEncryption = msg.getSSLEncryption(); |
| | | this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ReplicationServerInfo object wrapping a |
| | | * {@link ReplServerStartDSMsg}. |
| | | * |
| | | * @param replServerStartDSMsg |
| | | * @param msg |
| | | * The {@link ReplServerStartDSMsg} this object will wrap. |
| | | */ |
| | | private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg) |
| | | private ReplicationServerInfo(ReplServerStartDSMsg msg) |
| | | { |
| | | this.protocolVersion = replServerStartDSMsg.getVersion(); |
| | | this.generationId = replServerStartDSMsg.getGenerationId(); |
| | | this.groupId = replServerStartDSMsg.getGroupId(); |
| | | this.serverId = replServerStartDSMsg.getServerId(); |
| | | this.serverURL = replServerStartDSMsg.getServerURL(); |
| | | this.baseDN = replServerStartDSMsg.getBaseDN(); |
| | | this.windowSize = replServerStartDSMsg.getWindowSize(); |
| | | this.serverState = replServerStartDSMsg.getServerState(); |
| | | this.sslEncryption = replServerStartDSMsg.getSSLEncryption(); |
| | | this.degradedStatusThreshold = |
| | | replServerStartDSMsg.getDegradedStatusThreshold(); |
| | | this.weight = replServerStartDSMsg.getWeight(); |
| | | this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber(); |
| | | this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), |
| | | msg.getGenerationId(), msg.getGroupId(), msg.getWeight()); |
| | | this.protocolVersion = msg.getVersion(); |
| | | this.baseDN = msg.getBaseDN(); |
| | | this.windowSize = msg.getWindowSize(); |
| | | this.serverState = msg.getServerState(); |
| | | this.sslEncryption = msg.getSSLEncryption(); |
| | | this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); |
| | | this.connectedDSNumber = msg.getConnectedDSNumber(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | return rsInfo.getGroupId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | return rsInfo.getGenerationId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | return rsInfo.getId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public String getServerURL() |
| | | { |
| | | return serverURL; |
| | | return rsInfo.getServerUrl(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getWeight() |
| | | { |
| | | return weight; |
| | | return rsInfo.getWeight(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param rsInfo The RSinfo to use for the update |
| | | * @param connectedDSs The new connected DSs |
| | | */ |
| | | public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs) |
| | | public ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs) |
| | | { |
| | | this.serverId = rsInfo.getId(); |
| | | this.serverURL = rsInfo.getServerUrl(); |
| | | this.generationId = rsInfo.getGenerationId(); |
| | | this.groupId = rsInfo.getGroupId(); |
| | | this.weight = rsInfo.getWeight(); |
| | | this.rsInfo = new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), |
| | | rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); |
| | | this.connectedDSs = connectedDSs; |
| | | this.connectedDSNumber = connectedDSs.size(); |
| | | this.degradedStatusThreshold = -1; |
| | | this.serverState = new ServerState(); |
| | | } |
| | | |
| | |
| | | */ |
| | | public RSInfo toRSInfo() |
| | | { |
| | | return new RSInfo(serverId, serverURL, generationId, groupId, weight); |
| | | return rsInfo; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param rsInfo The RSinfo to use for the update |
| | | * @param connectedDSs The new connected DSs |
| | | */ |
| | | public void update(RSInfo rsInfo, List<Integer> connectedDSs) |
| | | public void update(RSInfo rsInfo, Set<Integer> connectedDSs) |
| | | { |
| | | this.generationId = rsInfo.getGenerationId(); |
| | | this.groupId = rsInfo.getGroupId(); |
| | | this.weight = rsInfo.getWeight(); |
| | | this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(), |
| | | rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); |
| | | this.connectedDSs = connectedDSs; |
| | | this.connectedDSNumber = connectedDSs.size(); |
| | | } |
| | | |
| | | private void setServerURL(String newServerURL) |
| | | { |
| | | rsInfo = new RSInfo(rsInfo.getId(), newServerURL, |
| | | rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); |
| | | } |
| | | |
| | | /** |
| | | * Updates replication server info with the passed server state. |
| | | * @param serverState The ServerState to use for the update |
| | |
| | | if (this.serverState != null) |
| | | { |
| | | this.serverState.update(serverState); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | this.serverState = serverState; |
| | | } |
| | |
| | | * Get the getConnectedDSs. |
| | | * @return the getConnectedDSs |
| | | */ |
| | | public List<Integer> getConnectedDSs() |
| | | public Set<Integer> getConnectedDSs() |
| | | { |
| | | return connectedDSs; |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "Url:" + this.serverURL + " ServerId:" + this.serverId |
| | | + " GroupId:" + this.groupId; |
| | | return "Url:" + getServerURL() + " ServerId:" + getServerId() |
| | | + " GroupId:" + getGroupId(); |
| | | } |
| | | } |
| | | |
| | | private void connect(ConnectedRS rs) |
| | | private void connect() |
| | | { |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(rs); |
| | | connectAsECL(); |
| | | } |
| | | else |
| | | { |
| | |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | // Connect to server + get and store info about it |
| | | ReplicationServerInfo rsInfo = |
| | | performPhaseOneHandshake(serverUrl, false, false); |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false); |
| | | final ReplicationServerInfo rsInfo = rs.rsInfo; |
| | | if (rsInfo != null) |
| | | { |
| | | rsInfos.put(rsInfo.getServerId(), rsInfo); |
| | |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | | private void connectAsECL(ConnectedRS rs) |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | final String bestServer = getReplicationServerUrls().iterator().next(); |
| | | if (performPhaseOneHandshake(bestServer, true, true) != null) |
| | | final String bestServerURL = getReplicationServerUrls().iterator().next(); |
| | | final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true); |
| | | if (rs.isConnected()) |
| | | { |
| | | performECLPhaseTwoHandshake(bestServer, rs); |
| | | performECLPhaseTwoHandshake(bestServerURL, rs); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private void connectAsDataServer() |
| | | { |
| | | /* |
| | | May have created a broker with null replication domain for |
| | | unit test purpose. |
| | | */ |
| | | if (domain != null) |
| | | { |
| | | /* |
| | |
| | | |
| | | if (replicationServerInfos.isEmpty()) |
| | | { |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | else |
| | | { |
| | | // At least one server answered, find the best one. |
| | | RSEvaluations evals = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | ReplicationServerInfo electedRsInfo = evals.getBestRS(); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" |
| | | + electedRsInfo); |
| | | electedRsInfo = performPhaseOneHandshake( |
| | | electedRsInfo.getServerURL(), true, false); |
| | | + evals.getBestRS()); |
| | | |
| | | final ConnectedRS electedRS = performPhaseOneHandshake( |
| | | evals.getBestRS().getServerURL(), true, false); |
| | | final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; |
| | | if (electedRsInfo != null) |
| | | { |
| | | /* |
| | |
| | | // Handshake phase 1 exchange went well |
| | | |
| | | // Compute in which status we are starting the session to tell the RS |
| | | ServerStatus initStatus = |
| | | computeInitialServerStatus(electedRsInfo.getGenerationId(), |
| | | electedRsInfo.getServerState(), |
| | | electedRsInfo.getDegradedStatusThreshold(), |
| | | getGenerationID()); |
| | | final ServerStatus initStatus = computeInitialServerStatus( |
| | | electedRsInfo.getGenerationId(), electedRsInfo.getServerState(), |
| | | electedRsInfo.getDegradedStatusThreshold(), getGenerationID()); |
| | | |
| | | // Perform session start (handshake phase 2) |
| | | TopologyMsg topologyMsg = performPhaseTwoHandshake( |
| | | electedRsInfo.getServerURL(), initStatus); |
| | | final TopologyMsg topologyMsg = |
| | | performPhaseTwoHandshake(electedRS, initStatus); |
| | | |
| | | if (topologyMsg != null) // Handshake phase 2 exchange went well |
| | | { |
| | | connectToReplicationServer(electedRsInfo, initStatus, topologyMsg); |
| | | connectToReplicationServer(electedRS, initStatus, topologyMsg); |
| | | } // Could perform handshake phase 2 with best |
| | | } // Could perform handshake phase 1 with best |
| | | } |
| | | |
| | | // connectedRS has been updated by calls above, reload it |
| | | final ConnectedRS rs = connectedRS.get(); |
| | | if (rs.connected) |
| | | if (rs.isConnected()) |
| | | { |
| | | connectPhaseLock.notify(); |
| | | |
| | |
| | | { |
| | | logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | | serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), getGenerationID())); |
| | | rs.replicationServer, getGenerationID())); |
| | | } |
| | | else |
| | | { |
| | | logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( |
| | | serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), getGenerationID(), rsGenId)); |
| | | rs.replicationServer, getGenerationID(), rsGenId)); |
| | | } |
| | | } |
| | | else |
| | |
| | | /** |
| | | * Connects to a replication server. |
| | | * |
| | | * @param rsInfo |
| | | * @param rs |
| | | * the Replication Server to connect to |
| | | * @param initStatus |
| | | * The status to enter the state machine with |
| | | * @param topologyMsg |
| | | * the message containing the topology information |
| | | */ |
| | | private void connectToReplicationServer(ReplicationServerInfo rsInfo, |
| | | private void connectToReplicationServer(ConnectedRS rs, |
| | | ServerStatus initStatus, TopologyMsg topologyMsg) |
| | | { |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | final ReplicationServerInfo rsInfo = rs.rsInfo; |
| | | |
| | | ConnectedRS rs = null; |
| | | boolean connectSuccessful = false; |
| | | try |
| | | { |
| | | maxSendWindow = rsInfo.getWindowSize(); |
| | | |
| | | receiveTopo(topologyMsg); |
| | | receiveTopo(topologyMsg, rs.getServerId()); |
| | | |
| | | /* |
| | | Log a message to let the administrator know that the failure |
| | | was resolved. |
| | | Log a message to let the administrator know that the failure was resolved. |
| | | Wake up all the thread that were waiting on the window |
| | | on the previous connection. |
| | | */ |
| | |
| | | } |
| | | sendWindow = new Semaphore(maxSendWindow); |
| | | rcvWindow = getMaxRcvWindow(); |
| | | rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress()); |
| | | connectedRS.set(rs); |
| | | |
| | | /* |
| | | May have created a broker with null replication domain for |
| | | unit test purpose. |
| | | */ |
| | | if (domain != null) |
| | | { |
| | | domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo |
| | | .getGenerationId(), session); |
| | | domain.sessionInitiated(initStatus, rsInfo.getServerState(), |
| | | rsInfo.getGenerationId(), rs.session); |
| | | } |
| | | |
| | | final byte groupId = getGroupId(); |
| | |
| | | logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rs.getServerId()), |
| | | rsInfo.getServerURL(), Byte.toString(rs.getGroupId()), |
| | | baseDN.toNormalizedString(), Integer.toString(serverId))); |
| | | baseDN.toNormalizedString(), Integer.toString(getServerId()))); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | | startRSHeartBeatMonitoring(rs); |
| | | if (rsInfo.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | startChangeTimeHeartBeatPublishing(); |
| | | startChangeTimeHeartBeatPublishing(rs); |
| | | } |
| | | setConnectedRS(rs); |
| | | connectSuccessful = true; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | logError(ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDN.toNormalizedString(), rsInfo.getServerURL(), |
| | | e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | if (rs == null) |
| | | if (!connectSuccessful) |
| | | { |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setSession(null); |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | } |
| | | } |
| | |
| | | * messages exchange) and return the reply message from the replication |
| | | * server, wrapped in a ReplicationServerInfo object. |
| | | * |
| | | * @param server |
| | | * @param serverURL |
| | | * Server to connect to. |
| | | * @param keepConnection |
| | | * @param keepSession |
| | | * Do we keep session opened or not after handshake. Use true if want |
| | | * to perform handshake phase 2 with the same session and keep the |
| | | * session to create as the current one. |
| | |
| | | * Indicates whether or not the an ECL handshake is to be performed. |
| | | * @return The answer from the server . Null if could not get an answer. |
| | | */ |
| | | private ReplicationServerInfo performPhaseOneHandshake( |
| | | String server, boolean keepConnection, boolean isECL) |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, |
| | | boolean keepSession, boolean isECL) |
| | | { |
| | | Session localSession = null; |
| | | Session newSession = null; |
| | | Socket socket = null; |
| | | boolean hasConnected = false; |
| | | Message errorMessage = null; |
| | |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); |
| | | socket.connect(HostPort.valueOf(server).toInetSocketAddress(), timeoutMS); |
| | | localSession = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), |
| | | timeoutMS); |
| | | newSession = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | boolean isSslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | | // Send our ServerStartMsg. |
| | | final HostPort hp = new HostPort( |
| | | socket.getLocalAddress().getHostName(), socket.getLocalPort()); |
| | | String url = hp.toString(); |
| | | StartMsg serverStartMsg; |
| | | final String url = hp.toString(); |
| | | final StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | localSession.publish(serverStartMsg); |
| | | newSession.publish(serverStartMsg); |
| | | |
| | | // Read the ReplServerStartMsg or ReplServerStartDSMsg that should |
| | | // come back. |
| | | ReplicationMsg msg = localSession.receive(); |
| | | ReplicationMsg msg = newSession.receive(); |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n" |
| | |
| | | |
| | | // Wrap received message in a server info object |
| | | final ReplicationServerInfo replServerInfo = |
| | | ReplicationServerInfo.newInstance(msg, server); |
| | | ReplicationServerInfo.newInstance(msg, serverURL); |
| | | |
| | | // Sanity check |
| | | final DN repDN = replServerInfo.getBaseDN(); |
| | |
| | | { |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get( |
| | | repDN.toNormalizedString(), getBaseDN().toNormalizedString()); |
| | | return null; |
| | | return setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | /* |
| | |
| | | * replication server will use the same one (or an older one if it is an |
| | | * old replication server). |
| | | */ |
| | | final short localProtocolVersion = getCompatibleVersion(replServerInfo |
| | | .getProtocolVersion()); |
| | | if (keepConnection) |
| | | { |
| | | protocolVersion = localProtocolVersion; |
| | | } |
| | | localSession.setProtocolVersion(localProtocolVersion); |
| | | newSession.setProtocolVersion( |
| | | getCompatibleVersion(replServerInfo.getProtocolVersion())); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | localSession.stopEncryption(); |
| | | newSession.stopEncryption(); |
| | | } |
| | | |
| | | hasConnected = true; |
| | | |
| | | // If this connection is the one to use for sending and receiving |
| | | // updates, store it. |
| | | if (keepConnection) |
| | | if (keepSession) |
| | | { |
| | | setSession(localSession); |
| | | // cannot store it yet, |
| | | // only store after a successful phase two handshake |
| | | return new ConnectedRS(replServerInfo, newSession); |
| | | } |
| | | |
| | | return replServerInfo; |
| | | return new ConnectedRS(replServerInfo, null); |
| | | } |
| | | catch (ConnectException e) |
| | | { |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | serverURL, getBaseDN().toNormalizedString()); |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | serverURL, getBaseDN().toNormalizedString()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString(), |
| | | serverURL, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | } |
| | | finally |
| | | { |
| | | if (!hasConnected || !keepConnection) |
| | | if (!hasConnected || !keepSession) |
| | | { |
| | | close(localSession); |
| | | close(newSession); |
| | | close(socket); |
| | | } |
| | | |
| | | if (keepConnection && !hasConnected) |
| | | { |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | if (!hasConnected && errorMessage != null && !connectionError) |
| | | { |
| | | // There was no server waiting on this host:port |
| | | // Log a notice and will try the next replicationServer in the list |
| | | if (keepConnection) // Log error message only for final connection |
| | | if (keepSession) // Log error message only for final connection |
| | | { |
| | | // log the error message only once to avoid overflowing the error log |
| | | logError(errorMessage); |
| | |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | return setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | |
| | |
| | | try |
| | | { |
| | | // Send our Start Session |
| | | StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); |
| | | final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | final Session localSession = session; |
| | | localSession.publish(startECLSessionMsg); |
| | | rs.session.publish(startECLSessionMsg); |
| | | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | localSession.setSoTimeout(timeout); |
| | | connectedRS.set(rs.setConnected()); |
| | | rs.session.setSoTimeout(timeout); |
| | | setConnectedRS(rs); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | stackTraceToSingleLineString(e))); |
| | | |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setSession(null); |
| | | rs.session.close(); |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | } |
| | | |
| | |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | | * replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @param electedRS Server we are connecting with. |
| | | * @param initStatus The status we are starting with |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private TopologyMsg performPhaseTwoHandshake(String server, |
| | | private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS, |
| | | ServerStatus initStatus) |
| | | { |
| | | try |
| | | { |
| | | /* |
| | | * Send our StartSessionMsg. |
| | | */ |
| | | StartSessionMsg startSessionMsg; |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | // Send our StartSessionMsg. |
| | | final StartSessionMsg startSessionMsg; |
| | | if (domain != null) |
| | | { |
| | | startSessionMsg = new StartSessionMsg( |
| | |
| | | startSessionMsg = |
| | | new StartSessionMsg(initStatus, new ArrayList<String>()); |
| | | } |
| | | final Session localSession = session; |
| | | localSession.publish(startSessionMsg); |
| | | final Session session = electedRS.session; |
| | | session.publish(startSessionMsg); |
| | | |
| | | // Read the TopologyMsg that should come back. |
| | | final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive(); |
| | | final TopologyMsg topologyMsg = (TopologyMsg) session.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | localSession.setSoTimeout(timeout); |
| | | session.setSoTimeout(timeout); |
| | | return topologyMsg; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), electedRS.rsInfo.getServerURL(), |
| | | getBaseDN().toNormalizedString(), stackTraceToSingleLineString(e))); |
| | | |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setSession(null); |
| | | |
| | | // Be sure to return null. |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | return null; |
| | | } |
| | | } |
| | |
| | | /** |
| | | * Start the heartbeat monitor thread. |
| | | */ |
| | | private void startRSHeartBeatMonitoring() |
| | | private void startRSHeartBeatMonitoring(ConnectedRS rs) |
| | | { |
| | | // Start a heartbeat monitor thread. |
| | | final long heartbeatInterval = config.getHeartbeatInterval(); |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(), |
| | | getBaseDN().toNormalizedString(), session, heartbeatInterval); |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(), |
| | | getBaseDN().toNormalizedString(), rs.session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void reStart(boolean infiniteTry) |
| | | { |
| | | reStart(session, infiniteTry); |
| | | reStart(connectedRS.get().session, infiniteTry); |
| | | } |
| | | |
| | | /** |
| | |
| | | numLostConnections++; |
| | | } |
| | | |
| | | ConnectedRS rs; |
| | | if (failingSession == session) |
| | | ConnectedRS rs = connectedRS.get(); |
| | | if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS())) |
| | | { |
| | | rs = ConnectedRS.noConnectedRS(); |
| | | connectedRS.set(rs); |
| | | setSession(null); |
| | | } else { |
| | | rs = connectedRS.get(); |
| | | rs = setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | while (true) |
| | |
| | | // Synchronize inside the loop in order to allow shutdown. |
| | | synchronized (startStopLock) |
| | | { |
| | | if (rs.connected || shutdown) |
| | | if (rs.isConnected() || shutdown) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | try |
| | | { |
| | | connect(rs); |
| | | connect(); |
| | | rs = connectedRS.get(); |
| | | } |
| | | catch (Exception e) |
| | |
| | | logError(mb.toMessage()); |
| | | } |
| | | |
| | | if (rs.connected || !infiniteTry) |
| | | if (rs.isConnected() || !infiniteTry) |
| | | { |
| | | break; |
| | | } |
| | |
| | | { |
| | | Thread.sleep(500); |
| | | } |
| | | catch (InterruptedException e) |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // ignore |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("end restart : connected=" + rs.connected + " with RS(" |
| | | debugInfo("end restart : connected=" + rs.isConnected() + " with RS(" |
| | | + rs.getServerId() + ") genId=" + generationID); |
| | | } |
| | | } |
| | |
| | | Semaphore currentWindowSemaphore; |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | currentSession = session; |
| | | currentSession = connectedRS.get().session; |
| | | currentWindowSemaphore = sendWindow; |
| | | } |
| | | |
| | |
| | | Check the session. If it has changed, some disconnection or |
| | | reconnection happened and we need to restart from scratch. |
| | | */ |
| | | final Session localSession = session; |
| | | if (localSession != null && session == currentSession) |
| | | final Session session = connectedRS.get().session; |
| | | if (session != null && session == currentSession) |
| | | { |
| | | localSession.publish(msg); |
| | | session.publish(msg); |
| | | done = true; |
| | | } |
| | | } |
| | |
| | | window update message was lost somehow... |
| | | then loop to check again if connection was closed. |
| | | */ |
| | | Session localSession = session; |
| | | if (localSession != null) |
| | | Session session = connectedRS.get().session; |
| | | if (session != null) |
| | | { |
| | | localSession.publish(new WindowProbeMsg()); |
| | | session.publish(new WindowProbeMsg()); |
| | | } |
| | | } |
| | | } |
| | | } catch (IOException e) |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (!retryOnFailure) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | // The receive threads should handle reconnection or |
| | | // mark this broker in error. Just retry. |
| | |
| | | { |
| | | while (!shutdown) |
| | | { |
| | | final ConnectedRS rs = connectedRS.get(); |
| | | if (reconnectOnFailure && !rs.connected) |
| | | ConnectedRS rs = connectedRS.get(); |
| | | if (reconnectOnFailure && !rs.isConnected()) |
| | | { |
| | | // infinite try to reconnect |
| | | reStart(null, true); |
| | | continue; |
| | | } |
| | | |
| | | // Save session information for later in case we need it for log messages |
| | | // after the session has been closed and/or failed. |
| | | final Session localSession = session; |
| | | if (localSession == null) |
| | | if (rs.session == null) |
| | | { |
| | | // Must be shutting down. |
| | | break; |
| | |
| | | final int previousRsServerID = rs.getServerId(); |
| | | try |
| | | { |
| | | ReplicationMsg msg = localSession.receive(); |
| | | ReplicationMsg msg = rs.session.receive(); |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | synchronized (this) |
| | |
| | | } |
| | | if (msg instanceof WindowMsg) |
| | | { |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | final WindowMsg windowMsg = (WindowMsg) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | receiveTopo(topoMsg); |
| | | final TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | receiveTopo(topoMsg, getRsServerId()); |
| | | if (reconnectToTheBestRS) |
| | | { |
| | | // Reset wait time before next computation of best server |
| | |
| | | |
| | | // Caller wants to check what's changed |
| | | if (returnOnTopoChange) |
| | | { |
| | | return msg; |
| | | |
| | | } |
| | | } |
| | | else if (msg instanceof StopMsg) |
| | | { |
| | | // RS performs a proper disconnection |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( |
| | | previousRsServerID, localSession.getReadableRemoteAddress(), |
| | | previousRsServerID, rs.replicationServer, |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | |
| | | // Try to find a suitable RS |
| | | reStart(localSession, true); |
| | | reStart(rs.session, true); |
| | | } |
| | | else if (msg instanceof MonitorMsg) |
| | | { |
| | |
| | | if (bestServerInfo == null) |
| | | { |
| | | message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( |
| | | serverId, previousRsServerID, |
| | | localSession.getReadableRemoteAddress(), |
| | | serverId, previousRsServerID, rs.replicationServer, |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | else |
| | | { |
| | | final int bestRsServerId = bestServerInfo.getServerId(); |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, previousRsServerID, |
| | | localSession.getReadableRemoteAddress(), |
| | | serverId, previousRsServerID, rs.replicationServer, |
| | | bestRsServerId, |
| | | baseDN.toNormalizedString(), |
| | | evals.getEvaluation(previousRsServerID).toString(), |
| | |
| | | |
| | | if (!shutdown) |
| | | { |
| | | final Session tmpSession = session; |
| | | if (tmpSession == null || !tmpSession.closeInitiated()) |
| | | if (rs.session == null || !rs.session.closeInitiated()) |
| | | { |
| | | // We did not initiate the close on our side, log an error message. |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( |
| | | logError(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( |
| | | serverId, baseDN.toNormalizedString(), previousRsServerID, |
| | | localSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | rs.replicationServer)); |
| | | } |
| | | |
| | | if (reconnectOnFailure) |
| | | { |
| | | reStart(localSession, true); |
| | | } |
| | | else |
| | | if (!reconnectOnFailure) |
| | | { |
| | | break; // does not seem necessary to explicitly disconnect .. |
| | | } |
| | | |
| | | reStart(rs.session, true); |
| | | } |
| | | } |
| | | } // while !shutdown |
| | |
| | | try |
| | | { |
| | | updateDoneCount++; |
| | | final Session localSession = session; |
| | | if (updateDoneCount >= halfRcvWindow && localSession != null) |
| | | final Session session = connectedRS.get().session; |
| | | if (updateDoneCount >= halfRcvWindow && session != null) |
| | | { |
| | | localSession.publish(new WindowMsg(updateDoneCount)); |
| | | session.publish(new WindowMsg(updateDoneCount)); |
| | | rcvWindow += updateDoneCount; |
| | | updateDoneCount = 0; |
| | | } |
| | |
| | | synchronized (startStopLock) |
| | | { |
| | | shutdown = true; |
| | | setConnectedRS(ConnectedRS.stopped()); |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | connectedRS.set(ConnectedRS.stopped()); |
| | | setSession(null); |
| | | deregisterReplicationMonitor(); |
| | | } |
| | | } |
| | |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | { |
| | | this.timeout = timeout; |
| | | final Session localSession = session; |
| | | if (localSession != null) |
| | | final Session session = connectedRS.get().session; |
| | | if (session != null) |
| | | { |
| | | localSession.setSoTimeout(timeout); |
| | | session.setSoTimeout(timeout); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | | final Session session = connectedRS.get().session; |
| | | if (session != null) |
| | | { |
| | | return session.getProtocolVersion(); |
| | | } |
| | | return ProtocolVersion.getCurrentVersion(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isConnected() |
| | | { |
| | | return connectedRS.get().connected; |
| | | return connectedRS.get().isConnected(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isSessionEncrypted() |
| | | { |
| | | final Session tmp = session; |
| | | return tmp != null ? tmp.isEncrypted() : false; |
| | | final Session session = connectedRS.get().session; |
| | | return session != null ? session.isEncrypted() : false; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS, |
| | | newStatus); |
| | | session.publish(csMsg); |
| | | connectedRS.get().session.publish( |
| | | new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus)); |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | |
| | | * Computes the list of DSs connected to a particular RS. |
| | | * @param rsId The RS id of the server one wants to know the connected DSs |
| | | * @param dsList The list of DSinfo from which to compute things |
| | | * @param rsServerId the serverId to use for the connectedDS |
| | | * @return The list of connected DSs to the server rsId |
| | | */ |
| | | private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList) |
| | | private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList, |
| | | int rsServerId) |
| | | { |
| | | List<Integer> connectedDSs = new ArrayList<Integer>(); |
| | | |
| | | if (getRsServerId() == rsId) |
| | | final Set<Integer> connectedDSs = new HashSet<Integer>(); |
| | | if (rsServerId == rsId) |
| | | { |
| | | /* |
| | | If we are computing connected DSs for the RS we are connected |
| | |
| | | for (DSInfo dsInfo : dsList) |
| | | { |
| | | if (dsInfo.getRsId() == rsId) |
| | | { |
| | | connectedDSs.add(dsInfo.getDsId()); |
| | | } |
| | | } |
| | | |
| | | return connectedDSs; |
| | |
| | | * Processes an incoming TopologyMsg. |
| | | * Updates the structures for the local view of the topology. |
| | | * |
| | | * @param topoMsg The topology information received from RS. |
| | | * @param topoMsg |
| | | * The topology information received from RS. |
| | | * @param rsServerId |
| | | * the serverId to use for the connectedDS |
| | | */ |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | private void receiveTopo(TopologyMsg topoMsg, int rsServerId) |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("receive TopologyMsg=" + topoMsg); |
| | |
| | | final Set<Integer> rssToKeep = new HashSet<Integer>(); |
| | | for (RSInfo rsInfo : topoMsg.getRsList()) |
| | | { |
| | | int rsId = rsInfo.getId(); |
| | | final int rsId = rsInfo.getId(); |
| | | rssToKeep.add(rsId); // Mark this server as still existing |
| | | final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList); |
| | | Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId); |
| | | ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId); |
| | | if (rsInfo2 == null) |
| | | { |
| | |
| | | rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs); |
| | | setLocallyConfiguredFlag(rsInfo2); |
| | | replicationServerInfos.put(rsId, rsInfo2); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | // Update the existing info for the replication server |
| | | rsInfo2.update(rsInfo, connectedDSs); |
| | |
| | | /** |
| | | * Starts publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | private void startChangeTimeHeartBeatPublishing() |
| | | private void startChangeTimeHeartBeatPublishing(ConnectedRS rs) |
| | | { |
| | | // Start a CSN heartbeat thread. |
| | | long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval(); |
| | | if (changeTimeHeartbeatInterval > 0) |
| | | { |
| | | final Session localSession = session; |
| | | final String threadName = "Replica DS(" + getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + getBaseDN() + "\" to RS(" + getRsServerId() |
| | | + ") at " + localSession.getReadableRemoteAddress(); |
| | | + ") change time heartbeat publisher for domain \"" + getBaseDN() |
| | | + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer; |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |
| | | threadName, localSession, changeTimeHeartbeatInterval, getServerId()); |
| | | threadName, rs.session, changeTimeHeartbeatInterval, getServerId()); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } |
| | | else |
| | |
| | | */ |
| | | String getLocalUrl() |
| | | { |
| | | final Session tmp = session; |
| | | return tmp != null ? tmp.getLocalUrl() : ""; |
| | | final Session session = connectedRS.get().session; |
| | | return session != null ? session.getLocalUrl() : ""; |
| | | } |
| | | |
| | | /** |
| | |
| | | return monitor.getMonitorInstanceName(); |
| | | } |
| | | |
| | | private void setSession(final Session newSession) |
| | | private ConnectedRS setConnectedRS(final ConnectedRS newRS) |
| | | { |
| | | // De-register the monitor with the old name. |
| | | deregisterReplicationMonitor(); |
| | | |
| | | final Session oldSession = session; |
| | | if (oldSession != null) |
| | | final ConnectedRS oldRS = connectedRS.getAndSet(newRS); |
| | | if (!oldRS.equals(newRS) && oldRS.session != null) |
| | | { |
| | | oldSession.close(); |
| | | // monitor name is changing, deregister before registering again |
| | | deregisterReplicationMonitor(); |
| | | oldRS.session.close(); |
| | | registerReplicationMonitor(); |
| | | } |
| | | session = newSession; |
| | | |
| | | // Re-register the monitor with the new name. |
| | | registerReplicationMonitor(); |
| | | return newRS; |
| | | } |
| | | |
| | | /** |
| | | * Must be invoked each time the session changes because, the monitor name is |
| | | * dynamically created with the session name, while monitor registration is |
| | | * static. |
| | | * |
| | | * @see #monitor |
| | | */ |
| | | private void registerReplicationMonitor() |
| | | { |
| | | /* |
| | | * The monitor should not be registered if this is a unit test because the |
| | | * replication domain is null. |
| | | */ |
| | | // The monitor should not be registered if this is a unit test |
| | | // because the replication domain is null. |
| | | if (monitor != null) |
| | | { |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | |
| | | |
| | | private void deregisterReplicationMonitor() |
| | | { |
| | | /* |
| | | * The monitor should not be deregistered if this is a unit test because the |
| | | * replication domain is null. |
| | | */ |
| | | // The monitor should not be deregistered if this is a unit test |
| | | // because the replication domain is null. |
| | | if (monitor != null) |
| | | { |
| | | DirectoryServer.deregisterMonitorProvider(monitor); |