| | |
| | | import static org.opends.server.replication.server.ReplicationServer.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | | */ |
| | | /** The broker for Multi-master Replication. */ |
| | | public class ReplicationBroker |
| | | { |
| | | |
| | | /** |
| | | * Immutable class containing information about whether the broker is |
| | | * connected to an RS and data associated to this connected RS. |
| | |
| | | return session != null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | .append(")"); |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | private volatile boolean shutdown; |
| | | private final Object startStopLock = new Object(); |
| | |
| | | */ |
| | | private volatile boolean connectionError; |
| | | private final Object connectPhaseLock = new Object(); |
| | | /** |
| | | * The thread that publishes messages to the RS containing the current |
| | | * change time of this DS. |
| | | */ |
| | | /** The thread that publishes messages to the RS containing the current change time of this DS. */ |
| | | private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | | /* Properties for the last topology info received from the network. */ |
| | | /** Contains the last known state of the replication topology. */ |
| | | private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology()); |
| | | @GuardedBy("this") |
| | |
| | | registerReplicationMonitor(); |
| | | } |
| | | |
| | | /** |
| | | * Start the ReplicationBroker. |
| | | */ |
| | | /** Start the ReplicationBroker. */ |
| | | public void start() |
| | | { |
| | | synchronized (startStopLock) |
| | |
| | | private final short protocolVersion; |
| | | private final DN baseDN; |
| | | private final int windowSize; |
| | | // @NotNull |
| | | /** @NotNull */ |
| | | private final ServerState serverState; |
| | | private final boolean sslEncryption; |
| | | private final int degradedStatusThreshold; |
| | | /** Keeps the 0 value if created with a ReplServerStartMsg. */ |
| | | private int connectedDSNumber; |
| | | // @NotNull |
| | | /** @NotNull */ |
| | | private Set<Integer> connectedDSs; |
| | | /** |
| | | * Is this RS locally configured? (the RS is recognized as a usable server). |
| | | */ |
| | | /** Is this RS locally configured? (the RS is recognized as a usable server). */ |
| | | private boolean locallyConfigured = true; |
| | | |
| | | /** |
| | |
| | | */ |
| | | domain.toNotConnectedStatus(); |
| | | |
| | | /* |
| | | Stop any existing heartbeat monitor and changeTime publisher |
| | | from a previous session. |
| | | */ |
| | | /* Stop any existing heartbeat monitor and changeTime publisher from a previous session. */ |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake (start |
| | | * messages exchange) and return the reply message from the replication |
| | |
| | | return NOTE_UNKNOWN_RS.get(rsServerId, localServerId); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Evaluation local to one filter. |
| | | */ |
| | | /** Evaluation local to one filter. */ |
| | | private static class LocalEvaluation |
| | | { |
| | | private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>(); |
| | |
| | | { |
| | | return !accepted.isEmpty(); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | return evals; |
| | | } |
| | | |
| | | /** |
| | | * Now apply the choice based on the weight to the best servers list |
| | | */ |
| | | /* Now apply the choice based on the weight to the best servers list */ |
| | | if (firstConnection) |
| | | { |
| | | // We are not connected to a server yet |
| | |
| | | return idx != -1 && idx < overloadingDSsNumber; |
| | | } |
| | | |
| | | /** |
| | | * Start the heartbeat monitor thread. |
| | | */ |
| | | /** Start the heartbeat monitor thread. */ |
| | | private void startRSHeartBeatMonitoring(ConnectedRS rs) |
| | | { |
| | | final long heartbeatInterval = config.getHeartbeatInterval(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stop the heartbeat monitor thread. |
| | | */ |
| | | /** Stop the heartbeat monitor thread. */ |
| | | private synchronized void stopRSHeartBeatMonitoring() |
| | | { |
| | | if (heartbeatMonitor != null) |
| | |
| | | while (!shutdown) |
| | | { |
| | | ConnectedRS rs = connectedRS.get(); |
| | | if (reconnectOnFailure && !rs.isConnected()) |
| | | if (!rs.isConnected()) |
| | | { |
| | | // infinite try to reconnect |
| | | reStart(null, true); |
| | | continue; |
| | | } |
| | | |
| | | // Save session information for later in case we need it for log messages |
| | | // after the session has been closed and/or failed. |
| | | if (rs.session == null) |
| | | { |
| | | // Must be shutting down. |
| | | break; |
| | | if (reconnectOnFailure) |
| | | { |
| | | // infinite try to reconnect |
| | | reStart(null, true); |
| | | continue; |
| | | } |
| | | else |
| | | { |
| | | // Must be shutting down. |
| | | break; |
| | | } |
| | | } |
| | | |
| | | final int serverId = getServerId(); |
| | |
| | | reStart(rs.session, true); |
| | | } |
| | | } |
| | | } // while !shutdown |
| | | } |
| | | return null; |
| | | } |
| | | |
| | |
| | | public short getProtocolVersion() |
| | | { |
| | | final Session session = connectedRS.get().session; |
| | | if (session != null) |
| | | { |
| | | return session.getProtocolVersion(); |
| | | } |
| | | return ProtocolVersion.getCurrentVersion(); |
| | | return session != null ? session.getProtocolVersion() : ProtocolVersion.getCurrentVersion(); |
| | | } |
| | | |
| | | /** |
| | |
| | | return newTopo; |
| | | } |
| | | |
| | | /** |
| | | * Contains the last known state of the replication topology. |
| | | */ |
| | | /** Contains the last known state of the replication topology. */ |
| | | static final class Topology |
| | | { |
| | | |
| | | /** |
| | | * The RS's serverId that this DS was connected to when this topology state |
| | | * was computed. |
| | | */ |
| | | /** The RS's serverId that this DS was connected to when this topology state was computed. */ |
| | | private final int rsServerId; |
| | | /** |
| | | * Info for other DSs. |
| | |
| | | rsInfo.setLocallyConfigured(false); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean equals(Object obj) |
| | | { |
| | |
| | | return true; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | return connectionError; |
| | | } |
| | | |
| | | /** |
| | | * Starts publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | /** Starts publishing to the RS the current timestamp used in this server. */ |
| | | private void startChangeTimeHeartBeatPublishing(ConnectedRS rs) |
| | | { |
| | | // Start a CSN heartbeat thread. |
| | |
| | | threadName, rs.session, changeTimeHeartbeatInterval, getServerId()); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } |
| | | else |
| | | else if (logger.isTraceEnabled()) |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debugInfo("is not configured to send CSN heartbeat interval"); |
| | | } |
| | | debugInfo("is not configured to send CSN heartbeat interval"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stops publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | /** Stops publishing to the RS the current timestamp used in this server. */ |
| | | private synchronized void stopChangeTimeHeartBeatPublishing() |
| | | { |
| | | if (ctHeartbeatPublisherThread != null) |
| | |
| | | private ConnectedRS setConnectedRS(final ConnectedRS newRS) |
| | | { |
| | | final ConnectedRS oldRS = connectedRS.getAndSet(newRS); |
| | | if (!oldRS.equals(newRS) && oldRS.session != null) |
| | | if (!oldRS.equals(newRS) && oldRS.isConnected()) |
| | | { |
| | | // monitor name is changing, deregister before registering again |
| | | deregisterReplicationMonitor(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |