| | |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | { |
| | | |
| | | /** |
| | | * Immutable class containing information about whether the broker is |
| | | * connected to an RS and data associated to this connected RS. |
| | | * <p> |
| | | * Mutable methods return a new version of this object copying the data that |
| | | * did not change. |
| | | */ |
| | | // @Immutable |
| | | private static final class ConnectedRS |
| | | { |
| | | |
| | | private final String replicationServer; |
| | | /** The info of the RS we are connected to. */ |
| | | private final ReplicationServerInfo rsInfo; |
| | | private final boolean connected; |
| | | |
| | | private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo, |
| | | String replicationServer) |
| | | { |
| | | this.connected = connected; |
| | | this.rsInfo = rsInfo; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | private static ConnectedRS stopped() |
| | | { |
| | | return new ConnectedRS(false, null, "stopped"); |
| | | } |
| | | |
| | | private static ConnectedRS noConnectedRS() |
| | | { |
| | | return new ConnectedRS(false, null, NO_CONNECTED_SERVER); |
| | | } |
| | | |
| | | /** |
| | | * Returns a new version of the current object with the connected status set |
| | | * to true. |
| | | */ |
| | | private ConnectedRS setConnected() |
| | | { |
| | | return new ConnectedRS(true, rsInfo, replicationServer); |
| | | } |
| | | |
| | | public int getServerId() |
| | | { |
| | | return rsInfo != null ? rsInfo.getServerId() : -1; |
| | | } |
| | | |
| | | private byte getGroupId() |
| | | { |
| | | return rsInfo != null ? rsInfo.getGroupId() : -1; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | final StringBuilder sb = new StringBuilder(); |
| | | toString(sb); |
| | | return sb.toString(); |
| | | } |
| | | |
| | | public void toString(StringBuilder sb) |
| | | { |
| | | sb.append("connected=").append(connected).append(", "); |
| | | if (rsInfo == null) // this is a null object |
| | | { |
| | | sb.append("no connected RS"); |
| | | } |
| | | else |
| | | { |
| | | sb.append("connected RS(serverId=").append(rsInfo.getServerId()) |
| | | .append(", serverUrl=").append(rsInfo.getServerURL()) |
| | | .append(", groupId=").append(rsInfo.getGroupId()) |
| | | .append(")"); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private volatile boolean shutdown = false; |
| | | private final Object startStopLock = new Object(); |
| | | private volatile ReplicationDomainCfg config; |
| | | private volatile boolean connected = false; |
| | | /** |
| | | * String reported under CSN=monitor when there is no connected RS. |
| | | */ |
| | | public final static String NO_CONNECTED_SERVER = "Not connected"; |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | | private volatile Session session; |
| | | private final ServerState state; |
| | | private Semaphore sendWindow; |
| | |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | /** The group id of the RS we are connected to. */ |
| | | private byte rsGroupId = -1; |
| | | /** The server id of the RS we are connected to. */ |
| | | private int rsServerId = -1; |
| | | /** The server URL of the RS we are connected to. */ |
| | | private String rsServerUrl; |
| | | private final AtomicReference<ConnectedRS> connectedRS = |
| | | new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS()); |
| | | /** Our replication domain. */ |
| | | private ReplicationDomain domain; |
| | | /** |
| | | * This object is used as a conditional event to be notified about |
| | | * the reception of monitor information from the Replication Server. |
| | | */ |
| | | private final MutableBoolean monitorResponse = new MutableBoolean(false); |
| | | private final AtomicBoolean monitorResponse = new AtomicBoolean(false); |
| | | /** |
| | | * A Map containing the ServerStates of all the replicas in the topology |
| | | * as seen by the ReplicationServer the last time it was polled or the last |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(); |
| | | connect(connectedRS.get()); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public byte getRsGroupId() |
| | | { |
| | | return rsGroupId; |
| | | return connectedRS.get().getGroupId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getRsServerId() |
| | | { |
| | | return rsServerId; |
| | | return connectedRS.get().getServerId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the server url of the RS we are connected to. |
| | | * @return The server url of the RS we are connected to |
| | | */ |
| | | public String getRsServerUrl() |
| | | { |
| | | return rsServerUrl; |
| | | } |
| | | |
| | | /** |
| | | * Sets the locally configured flag for the passed ReplicationServerInfo |
| | | * object, analyzing the local configuration. |
| | | * @param rsInfo the Replication server to check and update |
| | | */ |
| | | private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo) |
| | | private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo) |
| | | { |
| | | // Determine if the passed ReplicationServerInfo has a URL that is present |
| | | // in the locally configured replication servers |
| | |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | | private void connect(ConnectedRS rs) |
| | | { |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | | } else |
| | | connectAsECL(rs); |
| | | } |
| | | else |
| | | { |
| | | connectAsDataServer(); |
| | | } |
| | |
| | | */ |
| | | private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo() |
| | | { |
| | | Map<Integer, ReplicationServerInfo> rsInfos = |
| | | new ConcurrentHashMap<Integer, ReplicationServerInfo>(); |
| | | final Map<Integer, ReplicationServerInfo> rsInfos = |
| | | new ConcurrentSkipListMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | | private void connectAsECL() |
| | | private void connectAsECL(ConnectedRS rs) |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = getReplicationServerUrls().iterator().next(); |
| | | |
| | | final String bestServer = getReplicationServerUrls().iterator().next(); |
| | | if (performPhaseOneHandshake(bestServer, true, true) != null) |
| | | { |
| | | performECLPhaseTwoHandshake(bestServer); |
| | | performECLPhaseTwoHandshake(bestServer, rs); |
| | | } |
| | | } |
| | | |
| | |
| | | // Get info from every available replication servers |
| | | replicationServerInfos = collectReplicationServersInfo(); |
| | | |
| | | ReplicationServerInfo electedRsInfo = null; |
| | | |
| | | if (replicationServerInfos.size() > 0) |
| | | if (replicationServerInfos.isEmpty()) |
| | | { |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | } |
| | | else |
| | | { |
| | | // At least one server answered, find the best one. |
| | | RSEvaluations evals = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | electedRsInfo = evals.getBestRS(); |
| | | ReplicationServerInfo electedRsInfo = evals.getBestRS(); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | |
| | | { |
| | | connectToReplicationServer(electedRsInfo, initStatus, topologyMsg); |
| | | } // Could perform handshake phase 2 with best |
| | | |
| | | } // Could perform handshake phase 1 with best |
| | | } |
| | | |
| | | } // Reached some servers |
| | | |
| | | // connected is set by connectToReplicationServer() |
| | | // and electedRsInfo isn't null then. Check anyway |
| | | if (electedRsInfo != null && connected) |
| | | final ConnectedRS rs = connectedRS.get(); |
| | | if (rs.connected) |
| | | { |
| | | connectPhaseLock.notify(); |
| | | |
| | | if ((electedRsInfo.getGenerationId() == getGenerationID()) |
| | | || (electedRsInfo.getGenerationId() == -1)) |
| | | final long rsGenId = rs.rsInfo.getGenerationId(); |
| | | final int rsServerId = rs.rsInfo.getServerId(); |
| | | if (rsGenId == getGenerationID() || rsGenId == -1) |
| | | { |
| | | Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID(), |
| | | electedRsInfo.getGenerationId()); |
| | | logError(message); |
| | | logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | | serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), getGenerationID())); |
| | | } |
| | | } else |
| | | else |
| | | { |
| | | /* |
| | | * This server could not find any replicationServer. It's going to start |
| | | * in degraded mode. Log a message. |
| | | */ |
| | | connected = false; |
| | | replicationServer = NO_CONNECTED_SERVER; |
| | | |
| | | logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( |
| | | serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), getGenerationID(), rsGenId)); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // This server could not find any replicationServer. |
| | | // It's going to start in degraded mode. Log a message. |
| | | if (!connectionError) |
| | | { |
| | | connectionError = true; |
| | |
| | | |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | Message message = WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | logError(WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, baseDN.toNormalizedString(), |
| | | collectionToString(replicationServerInfos.keySet(), ", ")); |
| | | logError(message); |
| | | collectionToString(replicationServerInfos.keySet(), ", "))); |
| | | } |
| | | else |
| | | { |
| | | Message message = WARN_NO_AVAILABLE_CHANGELOGS.get( |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | logError(WARN_NO_AVAILABLE_CHANGELOGS.get( |
| | | serverId, baseDN.toNormalizedString())); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | |
| | | ConnectedRS rs = null; |
| | | try |
| | | { |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | | maxSendWindow = rsInfo.getWindowSize(); |
| | | rsGroupId = rsInfo.getGroupId(); |
| | | rsServerId = rsInfo.getServerId(); |
| | | rsServerUrl = rsInfo.getServerURL(); |
| | | |
| | | receiveTopo(topologyMsg); |
| | | |
| | |
| | | } |
| | | sendWindow = new Semaphore(maxSendWindow); |
| | | rcvWindow = getMaxRcvWindow(); |
| | | connected = true; |
| | | rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress()); |
| | | connectedRS.set(rs); |
| | | |
| | | /* |
| | | May have created a broker with null replication domain for |
| | |
| | | } |
| | | |
| | | final byte groupId = getGroupId(); |
| | | if (getRsGroupId() != groupId) |
| | | if (rs.getGroupId() != groupId) |
| | | { |
| | | /* |
| | | Connected to replication server with wrong group id: |
| | | warn user and start heartbeat monitor to recover when a server |
| | | with the right group id shows up. |
| | | */ |
| | | Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rsServerId), |
| | | rsInfo.getServerURL(), Byte.toString(getRsGroupId()), |
| | | baseDN.toNormalizedString(), Integer.toString(serverId)); |
| | | logError(message); |
| | | logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rs.getServerId()), |
| | | rsInfo.getServerURL(), Byte.toString(rs.getGroupId()), |
| | | baseDN.toNormalizedString(), Integer.toString(serverId))); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | | if (rsInfo.getProtocolVersion() >= |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (!connected) |
| | | if (rs == null) |
| | | { |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setSession(null); |
| | | } |
| | | } |
| | |
| | | boolean isSslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | | // Send our ServerStartMsg. |
| | | String url = socket.getLocalAddress().getHostName() + ":" |
| | | + socket.getLocalPort(); |
| | | final HostPort hp = new HostPort( |
| | | socket.getLocalAddress().getHostName(), socket.getLocalPort()); |
| | | String url = hp.toString(); |
| | | StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | |
| | | } |
| | | |
| | | // Wrap received message in a server info object |
| | | ReplicationServerInfo replServerInfo = ReplicationServerInfo |
| | | .newInstance(msg, server); |
| | | final ReplicationServerInfo replServerInfo = |
| | | ReplicationServerInfo.newInstance(msg, server); |
| | | |
| | | // Sanity check |
| | | DN repDN = replServerInfo.getBaseDN(); |
| | | final DN repDN = replServerInfo.getBaseDN(); |
| | | if (!getBaseDN().equals(repDN)) |
| | | { |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get( |
| | |
| | | |
| | | hasConnected = true; |
| | | |
| | | // If this connection as the one to use for sending and receiving |
| | | // If this connection is the one to use for sending and receiving |
| | | // updates, store it. |
| | | if (keepConnection) |
| | | { |
| | |
| | | { |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | |
| | | close(socket); |
| | | } |
| | | |
| | | if (!hasConnected && errorMessage != null) |
| | | if (keepConnection && !hasConnected) |
| | | { |
| | | // There was no server waiting on this host:port Log a notice and try |
| | | // the next replicationServer in the list |
| | | if (!connectionError) |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | if (!hasConnected && errorMessage != null && !connectionError) |
| | | { |
| | | // There was no server waiting on this host:port |
| | | // Log a notice and will try the next replicationServer in the list |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | // log the error message only once to avoid overflowing the error log |
| | | logError(errorMessage); |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | |
| | | * |
| | | * @param server Server we are connecting with. |
| | | */ |
| | | private void performECLPhaseTwoHandshake(String server) |
| | | private void performECLPhaseTwoHandshake(String server, ConnectedRS rs) |
| | | { |
| | | try |
| | | { |
| | |
| | | |
| | | // Alright set the timeout to the desired value |
| | | localSession.setSoTimeout(timeout); |
| | | connected = true; |
| | | } catch (Exception e) |
| | | connectedRS.set(rs.setConnected()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setSession(null); |
| | | } |
| | | } |
| | |
| | | // unit test purpose. |
| | | if (domain != null) |
| | | { |
| | | startSessionMsg = |
| | | new StartSessionMsg( |
| | | startSessionMsg = new StartSessionMsg( |
| | | initStatus, |
| | | domain.getRefUrls(), |
| | | domain.isAssured(), |
| | |
| | | final Session localSession = session; |
| | | localSession.publish(startSessionMsg); |
| | | |
| | | /* |
| | | * Read the TopologyMsg that should come back. |
| | | */ |
| | | // Read the TopologyMsg that should come back. |
| | | final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | |
| | | // Alright set the timeout to the desired value |
| | | localSession.setSoTimeout(timeout); |
| | | return topologyMsg; |
| | | } catch (Exception e) |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | connectedRS.set(ConnectedRS.noConnectedRS()); |
| | | setSession(null); |
| | | |
| | | // Be sure to return null. |
| | |
| | | numLostConnections++; |
| | | } |
| | | |
| | | ConnectedRS rs; |
| | | if (failingSession == session) |
| | | { |
| | | connected = false; |
| | | rsGroupId = -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | rs = ConnectedRS.noConnectedRS(); |
| | | connectedRS.set(rs); |
| | | setSession(null); |
| | | } else { |
| | | rs = connectedRS.get(); |
| | | } |
| | | |
| | | while (true) |
| | |
| | | // Synchronize inside the loop in order to allow shutdown. |
| | | synchronized (startStopLock) |
| | | { |
| | | if (connected || shutdown) |
| | | if (rs.connected || shutdown) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | try |
| | | { |
| | | connect(); |
| | | connect(rs); |
| | | rs = connectedRS.get(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | logError(mb.toMessage()); |
| | | } |
| | | |
| | | if (connected || !infiniteTry) |
| | | if (rs.connected || !infiniteTry) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | } |
| | | try |
| | | { |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("end restart : connected=" + connected + " with RS(" |
| | | + getRsServerId() + ") genId=" + this.generationID); |
| | | debugInfo("end restart : connected=" + rs.connected + " with RS(" |
| | | + rs.getServerId() + ") genId=" + generationID); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | while (!shutdown) |
| | | { |
| | | if (reconnectOnFailure && !connected) |
| | | final ConnectedRS rs = connectedRS.get(); |
| | | if (reconnectOnFailure && !rs.connected) |
| | | { |
| | | // infinite try to reconnect |
| | | reStart(null, true); |
| | |
| | | |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | final int previousRsServerID = rsServerId; |
| | | final int previousRsServerID = rs.getServerId(); |
| | | try |
| | | { |
| | | ReplicationMsg msg = localSession.receive(); |
| | |
| | | public void stop() |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("is stopping and will close the connection to" |
| | | + " replication server " + rsServerId); |
| | | debugInfo("is stopping and will close the connection to RS(" |
| | | + getRsServerId() + ")"); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | | shutdown = true; |
| | | connected = false; |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | replicationServer = "stopped"; |
| | | rsGroupId = -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | connectedRS.set(ConnectedRS.stopped()); |
| | | setSession(null); |
| | | deregisterReplicationMonitor(); |
| | | } |
| | |
| | | */ |
| | | public String getReplicationServer() |
| | | { |
| | | return replicationServer; |
| | | return connectedRS.get().replicationServer; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | if (connected) |
| | | if (isConnected()) |
| | | { |
| | | return sendWindow.availablePermits(); |
| | | } |
| | |
| | | */ |
| | | public boolean isConnected() |
| | | { |
| | | return connected; |
| | | return connectedRS.get().connected; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | List<Integer> connectedDSs = new ArrayList<Integer>(); |
| | | |
| | | if (rsServerId == rsId) |
| | | if (getRsServerId() == rsId) |
| | | { |
| | | /* |
| | | If we are computing connected DSs for the RS we are connected |
| | |
| | | { |
| | | int rsId = rsInfo.getId(); |
| | | rssToKeep.add(rsId); // Mark this server as still existing |
| | | List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList); |
| | | final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList); |
| | | ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId); |
| | | if (rsInfo2 == null) |
| | | { |
| | | // New replication server, create info for it add it to the list |
| | | rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs); |
| | | // Set the locally configured flag for this new RS only if it is |
| | | // configured |
| | | updateRSInfoLocallyConfiguredStatus(rsInfo2); |
| | | setLocallyConfiguredFlag(rsInfo2); |
| | | replicationServerInfos.put(rsId, rsInfo2); |
| | | } else |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Now remove any replication server that may have disappeared from the |
| | | * topology. |
| | | */ |
| | | Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator(); |
| | | while (rsInfoIt.hasNext()) |
| | | { |
| | | final Integer rsId = rsInfoIt.next(); |
| | | if (!rssToKeep.contains(rsId)) |
| | | { |
| | | // This replication server has quit the topology, remove it from the |
| | | // list |
| | | rsInfoIt.remove(); |
| | | } |
| | | } |
| | | // Remove any replication server that may have disappeared from the topology |
| | | replicationServerInfos.keySet().retainAll(rssToKeep); |
| | | |
| | | if (domain != null) |
| | | { |
| | | for (DSInfo info : dsList) |
| | |
| | | .append(getServerId()).append("\",") |
| | | .append(" groupId=").append(getGroupId()) |
| | | .append(", genId=").append(generationID) |
| | | .append(", connected=").append(connected).append(", "); |
| | | if (rsServerId == -1) |
| | | { |
| | | sb.append("no RS"); |
| | | } |
| | | else |
| | | { |
| | | sb.append("bestRS(serverId=").append(rsServerId) |
| | | .append(", serverUrl=").append(rsServerUrl) |
| | | .append(", groupId=").append(rsGroupId) |
| | | .append(")"); |
| | | } |
| | | .append(", "); |
| | | connectedRS.get().toString(sb); |
| | | return sb.toString(); |
| | | } |
| | | |