| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.ArrayList; |
| | | import java.util.Map; |
| | | import java.util.Random; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachine; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.HeartbeatThread; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplServerStartMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg; |
| | | import org.opends.server.replication.protocol.StartMsg; |
| | | import org.opends.server.replication.protocol.StartSessionMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.WindowMsg; |
| | | import org.opends.server.replication.protocol.WindowProbeMsg; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | | * peer server (RS or DS). |
| | | * This class defines a server handler : |
| | | * - that is a MessageHandler (see this class for more details) |
| | | * - that handles all interaction with a peer server (RS or DS). |
| | | */ |
| | | public class ServerHandler extends MonitorProvider<MonitorProviderCfg> |
| | | public abstract class ServerHandler extends MessageHandler |
| | | { |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | /** |
| | | * Time during which the server will wait for existing thread to stop |
| | | * during the shutdownWriter. |
| | | */ |
| | | private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; |
| | | |
| | | /* |
| | | * Properties, filled if remote server is either a DS or a RS |
| | | /** |
| | | * Close the session and log the provided error message |
| | | * Log nothing if message is null. |
| | | * @param providedSession The provided closing session. |
| | | * @param providedMsg The provided error message. |
| | | * @param handler The handler that manages that session. |
| | | */ |
| | | private short serverId; |
| | | private ProtocolSession session; |
| | | private final MsgQueue msgQueue = new MsgQueue(); |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private ReplicationServerDomain replicationServerDomain = null; |
| | | private String serverURL; |
| | | static protected void closeSession(ProtocolSession providedSession, |
| | | Message providedMsg, ServerHandler handler) |
| | | { |
| | | if (providedMsg != null) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " |
| | | + ((handler!=null)?handler.toString():"Replication Server") |
| | | + " closing session with err=" + |
| | | providedMsg.toString()); |
| | | logError(providedMsg); |
| | | } |
| | | try |
| | | { |
| | | if (providedSession!=null) |
| | | providedSession.close(); |
| | | } catch (IOException ee) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | |
| | | // Number of update sent to the server |
| | | private int outCount = 0; |
| | | // Number of updates received from the server |
| | | private int inCount = 0; |
| | | /** |
| | | * The serverId of the remote server. |
| | | */ |
| | | protected short serverId; |
| | | /** |
| | | * The session opened with the remote server. |
| | | */ |
| | | protected ProtocolSession session; |
| | | |
| | | // Number of updates received from the server in assured safe read mode |
| | | private int assuredSrReceivedUpdates = 0; |
| | | // Number of updates received from the server in assured safe read mode that |
| | | // timed out |
| | | private AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); |
| | | // Number of updates sent to the server in assured safe read mode |
| | | private int assuredSrSentUpdates = 0; |
| | | // Number of updates sent to the server in assured safe read mode that timed |
| | | // out |
| | | private AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); |
| | | // Number of updates received from the server in assured safe data mode |
| | | private int assuredSdReceivedUpdates = 0; |
| | | // Number of updates received from the server in assured safe data mode that |
| | | // timed out |
| | | private AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); |
| | | // Number of updates sent to the server in assured safe data mode |
| | | private int assuredSdSentUpdates = 0; |
| | | // Number of updates sent to the server in assured safe data mode that timed |
| | | // out |
| | | private AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); |
| | | /** |
| | | * The serverURL of the remote server. |
| | | */ |
| | | protected String serverURL; |
| | | /** |
| | | * Number of updates received from the server in assured safe read mode. |
| | | */ |
| | | protected int assuredSrReceivedUpdates = 0; |
| | | /** |
| | | * Number of updates received from the server in assured safe read mode that |
| | | * timed out. |
| | | */ |
| | | protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); |
| | | /** |
| | | * Number of updates sent to the server in assured safe read mode. |
| | | */ |
| | | protected int assuredSrSentUpdates = 0; |
| | | /** |
| | | * Number of updates sent to the server in assured safe read mode that timed |
| | | * out. |
| | | */ |
| | | protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); |
| | | /** |
| | | // Number of updates received from the server in assured safe data mode. |
| | | */ |
| | | protected int assuredSdReceivedUpdates = 0; |
| | | /** |
| | | * Number of updates received from the server in assured safe data mode that |
| | | * timed out. |
| | | */ |
| | | protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); |
| | | /** |
| | | * Number of updates sent to the server in assured safe data mode. |
| | | */ |
| | | protected int assuredSdSentUpdates = 0; |
| | | |
| | | private int maxReceiveQueue = 0; |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private int maxQueueSize = 5000; |
| | | private int maxQueueBytesSize = maxQueueSize * 100; |
| | | private int restartReceiveQueue; |
| | | private int restartSendQueue; |
| | | private int restartReceiveDelay; |
| | | private int restartSendDelay; |
| | | private boolean serverIsLDAPserver; |
| | | private boolean following = false; |
| | | private ServerState serverState; |
| | | private boolean activeWriter = true; |
| | | private ServerWriter writer = null; |
| | | private String baseDn = null; |
| | | /** |
| | | * Number of updates sent to the server in assured safe data mode that timed |
| | | * out. |
| | | */ |
| | | protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); |
| | | |
| | | /** |
| | | * The associated ServerWriter that sends messages to the remote server. |
| | | */ |
| | | protected ServerReader reader; |
| | | /** |
| | | * The associated ServerReader that receives messages from the remote server. |
| | | */ |
| | | protected ServerWriter writer = null; |
| | | |
| | | // window |
| | | private int rcvWindow; |
| | | private int rcvWindowSizeHalf; |
| | | |
| | | private int maxRcvWindow; |
| | | private ServerReader reader; |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | private boolean flowControl = false; // indicate that the server is |
| | | // flow controlled and should |
| | | // be stopped from sending messages. |
| | | /** |
| | | * Semaphore that the writer uses to control the flow to the remote server. |
| | | */ |
| | | protected Semaphore sendWindow; |
| | | /** |
| | | * The initial size of the sending window. |
| | | */ |
| | | int sendWindowSize; |
| | | |
| | | private int saturationCount = 0; |
| | | private short replicationServerId; |
| | | private short protocolVersion = -1; |
| | | private long generationId = -1; |
| | | |
| | | // Group id of this remote server |
| | | private byte groupId = (byte) -1; |
| | | |
| | | /* |
| | | * Properties filled only if remote server is a DS |
| | | */ |
| | | |
| | | // Status of this DS (only used if this server handler represents a DS) |
| | | private ServerStatus status = ServerStatus.INVALID_STATUS; |
| | | // Referrals URLs this DS is exporting |
| | | private List<String> refUrls = new ArrayList<String>(); |
| | | // Assured replication enabled on DS or not |
| | | private boolean assuredFlag = false; |
| | | // DS assured mode (relevant if assured replication enabled) |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // DS safe data level (relevant if assured mode is safe data) |
| | | private byte safeDataLevel = (byte) -1; |
| | | |
| | | /* |
| | | * Properties filled only if remote server is a RS |
| | | */ |
| | | private String serverAddressURL; |
| | | /** |
| | | * When this Handler is related to a remote replication server |
| | | * this collection will contain as many elements as there are |
| | | * LDAP servers connected to the remote replication server. |
| | | * The protocol version established with the remote server. |
| | | */ |
| | | private final Map<Short, LightweightServerHandler> directoryServers = |
| | | new ConcurrentHashMap<Short, LightweightServerHandler>(); |
| | | protected short protocolVersion = -1; |
| | | /** |
| | | * remote generation id. |
| | | */ |
| | | protected long generationId = -1; |
| | | /** |
| | | * The generation id of the hosting RS. |
| | | */ |
| | | protected long localGenerationId = -1; |
| | | /** |
| | | * The generation id before procesing a new start handshake. |
| | | */ |
| | | protected long oldGenerationId = -1; |
| | | /** |
| | | * Group id of this remote server. |
| | | */ |
| | | protected byte groupId = (byte) -1; |
| | | /** |
| | | * The SSL encryption provided by the creator/starter of this handler. |
| | | */ |
| | | protected boolean initSslEncryption; |
| | | |
| | | /** |
| | | * The SSL encryption after the negociation with the peer. |
| | | */ |
| | | protected boolean sslEncryption; |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | protected long heartbeatInterval = 0; |
| | | |
| | | /** |
| | | * The thread that will send heartbeats. |
| | | */ |
| | | HeartbeatThread heartbeatThread = null; |
| | | |
| | | /** |
| | | * Set when ServerWriter is stopping. |
| | | */ |
| | | private boolean shutdownWriter = false; |
| | | protected boolean shutdownWriter = false; |
| | | |
| | | /** |
| | | * Set when ServerHandler is stopping. |
| | | */ |
| | | private AtomicBoolean shuttingDown = new AtomicBoolean(false); |
| | | |
| | | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | | * |
| | |
| | | * communicate with the remote entity. |
| | | * @param queueSize The maximum number of update that will be kept |
| | | * in memory by this ServerHandler. |
| | | * @param replicationServerURL The URL of the hosting replication server. |
| | | * @param replicationServerId The serverId of the hosting replication server. |
| | | * @param replicationServer The hosting replication server. |
| | | * @param rcvWindowSize The window size to receive from the remote server. |
| | | */ |
| | | public ServerHandler(ProtocolSession session, int queueSize) |
| | | public ServerHandler( |
| | | ProtocolSession session, |
| | | int queueSize, |
| | | String replicationServerURL, |
| | | short replicationServerId, |
| | | ReplicationServer replicationServer, |
| | | int rcvWindowSize) |
| | | { |
| | | super("Server Handler"); |
| | | super(queueSize, replicationServerURL, |
| | | replicationServerId, replicationServer); |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | this.maxQueueBytesSize = queueSize * 100; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.rcvWindowSizeHalf = rcvWindowSize / 2; |
| | | this.maxRcvWindow = rcvWindowSize; |
| | | this.rcvWindow = rcvWindowSize; |
| | | } |
| | | |
| | | /** |
| | | * Creates a DSInfo structure representing this remote DS. |
| | | * @return The DSInfo structure representing this remote DS |
| | | * Abort a start procedure currently establishing. |
| | | * @param reason The provided reason. |
| | | */ |
| | | public DSInfo toDSInfo() |
| | | protected void abortStart(Message reason) |
| | | { |
| | | DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId, |
| | | status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls); |
| | | |
| | | return dsInfo; |
| | | } |
| | | |
| | | /** |
| | | * Creates a RSInfo structure representing this remote RS. |
| | | * @return The RSInfo structure representing this remote RS |
| | | */ |
| | | public RSInfo toRSInfo() |
| | | { |
| | | RSInfo rsInfo = new RSInfo(serverId, generationId, groupId); |
| | | |
| | | return rsInfo; |
| | | } |
| | | |
| | | /** |
| | | * Do the handshake with either the DS or RS and then create the reader and |
| | | * writer thread. |
| | | * |
| | | * There are 2 possible handshake sequences: DS<->RS and RS<->RS. Each one are |
| | | * divided into 2 logical consecutive phases (phase 1 and phase 2): |
| | | * |
| | | * DS<->RS (DS (always initiating connection) always sends first message): |
| | | * ------- |
| | | * |
| | | * phase 1: |
| | | * DS --- ServerStartMsg ---> RS |
| | | * DS <--- ReplServerStartMsg --- RS |
| | | * phase 2: |
| | | * DS --- StartSessionMsg ---> RS |
| | | * DS <--- TopologyMsg --- RS |
| | | * |
| | | * RS<->RS (RS initiating connection always sends first message): |
| | | * ------- |
| | | * |
| | | * phase 1: |
| | | * RS1 --- ReplServerStartMsg ---> RS2 |
| | | * RS1 <--- ReplServerStartMsg --- RS2 |
| | | * phase 2: |
| | | * RS1 --- TopologyMsg ---> RS2 |
| | | * RS1 <--- TopologyMsg --- RS2 |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection (listen). |
| | | * @param replicationServerId The identifier of the replicationServer that |
| | | * creates this server handler. |
| | | * @param replicationServerURL The URL of the replicationServer that creates |
| | | * this server handler. |
| | | * @param windowSize the window size that this server handler must use. |
| | | * @param sslEncryption For outgoing connections indicates whether encryption |
| | | * should be used after the exchange of start messages. |
| | | * Ignored for incoming connections. |
| | | * @param replicationServer the ReplicationServer that created this server |
| | | * handler. |
| | | */ |
| | | public void start(String baseDn, short replicationServerId, |
| | | String replicationServerURL, |
| | | int windowSize, boolean sslEncryption, |
| | | ReplicationServer replicationServer) |
| | | { |
| | | |
| | | // The handshake phase must be done by blocking any access to structures |
| | | // keeping info on connected servers, so that one can safely check for |
| | | // pre-existence of a server, send a coherent snapshot of known topology |
| | | // to peers, update the local view of the topology... |
| | | // |
| | | // For instance a kind of problem could be that while we connect with a |
| | | // peer RS, a DS is connecting at the same time and we could publish the |
| | | // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. |
| | | // |
| | | // This method and every others that need to read/make changes to the |
| | | // structures holding topology for the domain should: |
| | | // - call ReplicationServerDomain.lock() |
| | | // - read/modify structures |
| | | // - call ReplicationServerDomain.release() |
| | | // |
| | | // More information is provided in comment of ReplicationServerDomain.lock() |
| | | |
| | | // If domain already exists, lock it until handshake is finished otherwise |
| | | // it will be created and locked later in the method |
| | | if (baseDn != null) |
| | | // We did not recognize the message, close session as what |
| | | // can happen after is undetermined and we do not want the server to |
| | | // be disturbed |
| | | if (session!=null) |
| | | { |
| | | ReplicationServerDomain rsd = |
| | | replicationServer.getReplicationServerDomain(baseDn, false); |
| | | if (rsd != null) |
| | | try |
| | | { |
| | | try |
| | | { |
| | | rsd.lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | session.publish( |
| | | new ErrorMsg( |
| | | replicationServerDomain.getReplicationServer().getServerId(), |
| | | serverId, |
| | | reason)); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | } |
| | | closeSession(session, reason, this); |
| | | } |
| | | |
| | | long oldGenerationId = -100; |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + |
| | | " starts a new LS or RS " + |
| | | ((baseDn == null) ? "incoming connection" : "outgoing connection")); |
| | | |
| | | this.replicationServerId = replicationServerId; |
| | | rcvWindowSizeHalf = windowSize / 2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | long localGenerationId = -1; |
| | | ReplServerStartMsg outReplServerStartMsg = null; |
| | | |
| | | /** |
| | | * This boolean prevents from logging a polluting error when connection\ |
| | | * aborted from a DS that wanted only to perform handshake phase 1 in order |
| | | * to determine the best suitable RS: |
| | | * 1) -> ServerStartMsg |
| | | * 2) <- ReplServerStartMsg |
| | | * 3) connection closure |
| | | */ |
| | | boolean log_error_message = true; |
| | | |
| | | try |
| | | // If generation id of domain was changed, set it back to old value |
| | | // We may have changed it as it was -1 and we received a value >0 from |
| | | // peer server and the last topo message sent may have failed being |
| | | // sent: in that case retrieve old value of generation id for |
| | | // replication server domain |
| | | if (oldGenerationId != -100) |
| | | { |
| | | /* |
| | | * PROCEDE WITH FIRST PHASE OF HANDSHAKE: |
| | | * ServerStartMsg then ReplServerStartMsg (with a DS) |
| | | * OR |
| | | * ReplServerStartMsg then ReplServerStartMsg (with a RS) |
| | | */ |
| | | replicationServerDomain.setGenerationId(oldGenerationId, false); |
| | | } |
| | | } |
| | | |
| | | if (baseDn != null) // Outgoing connection |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMsg if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void checkWindow() throws IOException |
| | | { |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | { |
| | | if (flowControl) |
| | | { |
| | | // This is an outgoing connection. Publish our start message. |
| | | this.baseDn = baseDn; |
| | | |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(baseDn, true); |
| | | if (!replicationServerDomain.hasLock()) |
| | | if (replicationServerDomain.restartAfterSaturation(this)) |
| | | { |
| | | try |
| | | { |
| | | replicationServerDomain.lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | flowControl = false; |
| | | } |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | } |
| | | if (!flowControl) |
| | | { |
| | | WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | } |
| | | |
| | | ServerState localServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | outReplServerStartMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, |
| | | baseDn, windowSize, localServerState, |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption, |
| | | replicationServer.getGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | /** |
| | | * Decrement the protocol window, then check if it is necessary |
| | | * to send a WindowMsg and send it. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void decAndCheckWindow() throws IOException |
| | | { |
| | | rcvWindow--; |
| | | checkWindow(); |
| | | } |
| | | |
| | | session.publish(outReplServerStartMsg); |
| | | /** |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | | * @return The previous value of the shut down flag |
| | | */ |
| | | public boolean engageShutdown() |
| | | { |
| | | // Use thread safe boolean |
| | | return shuttingDown.getAndSet(true); |
| | | } |
| | | |
| | | /** |
| | | * Finalize the initialization, create reader, writer, heartbeat system |
| | | * and monitoring system. |
| | | * @throws DirectoryException When an exception is raised. |
| | | */ |
| | | protected void finalizeStart() |
| | | throws DirectoryException |
| | | { |
| | | // FIXME:ECL We should refactor so that a SH always have a session |
| | | if (session != null) |
| | | { |
| | | try |
| | | { |
| | | // Disable timeout for next communications |
| | | session.setSoTimeout(0); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | } |
| | | |
| | | // Wait and process ServerStartMsg or ReplServerStartMsg |
| | | ReplicationMsg msg = session.receive(); |
| | | if (msg instanceof ServerStartMsg) |
| | | { |
| | | // The remote server is an LDAP Server. |
| | | ServerStartMsg serverStartMsg = (ServerStartMsg) msg; |
| | | |
| | | generationId = serverStartMsg.getGenerationId(); |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | serverStartMsg.getVersion()); |
| | | serverId = serverStartMsg.getServerId(); |
| | | serverURL = serverStartMsg.getServerURL(); |
| | | this.baseDn = serverStartMsg.getBaseDn(); |
| | | this.serverState = serverStartMsg.getServerState(); |
| | | this.groupId = serverStartMsg.getGroupId(); |
| | | |
| | | maxReceiveDelay = serverStartMsg.getMaxReceiveDelay(); |
| | | maxReceiveQueue = serverStartMsg.getMaxReceiveQueue(); |
| | | maxSendDelay = serverStartMsg.getMaxSendDelay(); |
| | | maxSendQueue = serverStartMsg.getMaxSendQueue(); |
| | | heartbeatInterval = serverStartMsg.getHeartbeatInterval(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | | sslEncryption = serverStartMsg.getSSLEncryption(); |
| | | |
| | | if (maxReceiveQueue > 0) |
| | | restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue - |
| | | 200 : maxReceiveQueue * 8 / 10); |
| | | else |
| | | restartReceiveQueue = 0; |
| | | |
| | | if (maxSendQueue > 0) |
| | | restartSendQueue = |
| | | (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 / |
| | | 10); |
| | | else |
| | | restartSendQueue = 0; |
| | | |
| | | if (maxReceiveDelay > 0) |
| | | restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1 |
| | | : maxReceiveDelay); |
| | | else |
| | | restartReceiveDelay = 0; |
| | | |
| | | if (maxSendDelay > 0) |
| | | restartSendDelay = |
| | | (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay); |
| | | else |
| | | restartSendDelay = 0; |
| | | |
| | | if (heartbeatInterval < 0) |
| | | { |
| | | heartbeatInterval = 0; |
| | | } |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | // Get or Create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | |
| | | // Hack to be sure that if a server disconnects and reconnect, we |
| | | // let the reader thread see the closure and cleanup any reference |
| | | // to old connection |
| | | replicationServerDomain.waitDisconnection(serverStartMsg.getServerId()); |
| | | |
| | | if (!replicationServerDomain.hasLock()) |
| | | { |
| | | try |
| | | { |
| | | replicationServerDomain.lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // Duplicate server ? |
| | | if (!replicationServerDomain.checkForDuplicateDS(this)) |
| | | { |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | |
| | | ServerState localServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | // This an incoming connection. Publish our start message |
| | | ReplServerStartMsg replServerStartMsg = |
| | | new ReplServerStartMsg(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState, |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption, |
| | | replicationServer.getGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | session.publish(replServerStartMsg); |
| | | sendWindowSize = serverStartMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the |
| | | negotiation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + serverStartMsg.toString() + |
| | | "\nAND REPLIED:\n" + replServerStartMsg.toString()); |
| | | } |
| | | } else if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | // The remote server is a replication server |
| | | ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) msg; |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | inReplServerStartMsg.getVersion()); |
| | | generationId = inReplServerStartMsg.getGenerationId(); |
| | | serverId = inReplServerStartMsg.getServerId(); |
| | | serverURL = inReplServerStartMsg.getServerURL(); |
| | | int separator = serverURL.lastIndexOf(':'); |
| | | serverAddressURL = |
| | | session.getRemoteAddress() + ":" + serverURL.substring(separator + |
| | | 1); |
| | | serverIsLDAPserver = false; |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | // We support connection from a V1 RS |
| | | // Only V2 protocol has the group id in repl server start message |
| | | this.groupId = inReplServerStartMsg.getGroupId(); |
| | | } |
| | | this.baseDn = inReplServerStartMsg.getBaseDn(); |
| | | |
| | | if (baseDn == null) // Reply to incoming RS |
| | | |
| | | { |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | if (!replicationServerDomain.hasLock()) |
| | | { |
| | | try |
| | | { |
| | | /** |
| | | * Take the lock on the domain. |
| | | * WARNING: Here we try to acquire the lock with a timeout. This |
| | | * is for preventing a deadlock that may happen if there are cross |
| | | * connection attempts (for same domain) from this replication |
| | | * server and from a peer one: |
| | | * Here is the scenario: |
| | | * - RS1 connect thread takes the domain lock and starts |
| | | * connection to RS2 |
| | | * - at the same time RS2 connect thread takes his domain lock and |
| | | * start connection to RS2 |
| | | * - RS2 listen thread starts processing received |
| | | * ReplServerStartMsg from RS1 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS2 connect thread already has |
| | | * it |
| | | * - RS1 listen thread starts processing received |
| | | * ReplServerStartMsg from RS2 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS1 connect thread already has |
| | | * it |
| | | * => Deadlock: 4 threads are locked. |
| | | * So to prevent that in such situation, the listen threads here |
| | | * will both timeout trying to acquire the lock. The random time |
| | | * for the timeout should allow on connection attempt to be |
| | | * aborted whereas the other one should have time to finish in the |
| | | * same time. |
| | | * Warning: the minimum time (3s) should be big enough to allow |
| | | * normal situation connections to terminate. The added random |
| | | * time should represent a big enough range so that the chance to |
| | | * have one listen thread timing out a lot before the peer one is |
| | | * great. When the first listen thread times out, the remote |
| | | * connect thread should release the lock and allow the peer |
| | | * listen thread to take the lock it was waiting for and process |
| | | * the connection attempt. |
| | | */ |
| | | Random random = new Random(); |
| | | int randomTime = random.nextInt(6); // Random from 0 to 5 |
| | | // Wait at least 3 seconds + (0 to 5 seconds) |
| | | long timeout = (long) (3000 + ( randomTime * 1000 ) ); |
| | | boolean noTimeout = replicationServerDomain.tryLock(timeout); |
| | | if (!noTimeout) |
| | | { |
| | | // Timeout |
| | | Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Short.toString(replicationServer.getServerId())); |
| | | closeSession(message); |
| | | return; |
| | | } |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | } |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | ServerState domServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | | sslEncryption = inReplServerStartMsg.getSSLEncryption(); |
| | | |
| | | // Publish our start message |
| | | outReplServerStartMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, domServerState, |
| | | protocolVersion, |
| | | localGenerationId, |
| | | sslEncryption, |
| | | replicationServer.getGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | session.publish(outReplServerStartMsg); |
| | | } else { |
| | | // We support connection from a V1 RS, send PDU with V1 form |
| | | session.publish(outReplServerStartMsg, |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V1); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + inReplServerStartMsg.toString() + |
| | | "\nAND REPLIED:\n" + outReplServerStartMsg.toString()); |
| | | } |
| | | } else |
| | | { |
| | | // Did the remote RS answer with the DN we provided him ? |
| | | if (!(this.baseDn.equals(baseDn))) |
| | | { |
| | | Message message = ERR_RS_DN_DOES_NOT_MATCH.get( |
| | | this.baseDn.toString(), |
| | | baseDn.toString()); |
| | | closeSession(message); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE SENT:\n" + outReplServerStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + inReplServerStartMsg.toString()); |
| | | } |
| | | } |
| | | this.serverState = inReplServerStartMsg.getServerState(); |
| | | sendWindowSize = inReplServerStartMsg.getWindowSize(); |
| | | |
| | | // Duplicate server ? |
| | | if (!replicationServerDomain.checkForDuplicateRS(this)) |
| | | { |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | /* Until here session is encrypted then it depends on the |
| | | negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | } else |
| | | { |
| | | // We did not recognize the message, close session as what |
| | | // can happen after is undetermined and we do not want the server to |
| | | // be disturbed |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { // Only protocol version above V1 has a phase 2 handshake |
| | | |
| | | /* |
| | | * NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE: |
| | | * TopologyMsg then TopologyMsg (with a RS) |
| | | * OR |
| | | * StartSessionMsg then TopologyMsg (with a DS) |
| | | */ |
| | | |
| | | TopologyMsg outTopoMsg = null; |
| | | |
| | | if (baseDn != null) // Outgoing connection to a RS |
| | | |
| | | { |
| | | // Send our own TopologyMsg to remote RS |
| | | outTopoMsg = replicationServerDomain.createTopologyMsgForRS(); |
| | | session.publish(outTopoMsg); |
| | | } |
| | | |
| | | // Wait and process TopologyMsg or StartSessionMsg |
| | | log_error_message = false; |
| | | ReplicationMsg msg2 = session.receive(); |
| | | log_error_message = true; |
| | | if (msg2 instanceof TopologyMsg) |
| | | { |
| | | // Remote RS sent his topo msg |
| | | TopologyMsg inTopoMsg = (TopologyMsg) msg2; |
| | | |
| | | // CONNECTION WITH A RS |
| | | |
| | | // if the remote RS and the local RS have the same genID |
| | | // then it's ok and nothing else to do |
| | | if (generationId == localGenerationId) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | } |
| | | } else |
| | | { |
| | | if (localGenerationId > 0) |
| | | { |
| | | // if the local RS is initialized |
| | | if (generationId > 0) |
| | | { |
| | | // if the remote RS is initialized |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationServerDomain.getGenerationIdSavedStatus()) |
| | | { |
| | | // if the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } else |
| | | { |
| | | // The present RS has never received changes regarding its |
| | | // gen ID. |
| | | // |
| | | // Example case: |
| | | // - we are in RS1 |
| | | // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) |
| | | // - RS1 has genId1 from LS1 /genId1 comes from data in |
| | | // suffix |
| | | // - we are in RS1 and we receive a START msg from RS2 |
| | | // - Each RS keeps its genID / is degraded and when LS2 |
| | | // will be populated from LS1 everything will become ok. |
| | | // |
| | | // Issue: |
| | | // FIXME : Would it be a good idea in some cases to just |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non null state and |
| | | // we have a nul state ? |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | // The remote RS has no genId. We don't change anything for the |
| | | // current RS. |
| | | } |
| | | } else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | // WARNING: Must be done before computing topo message to send |
| | | // to peer server as topo message must embed valid generation id |
| | | // for our server |
| | | oldGenerationId = |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | |
| | | if (baseDn == null) // Reply to the RS (incoming connection) |
| | | |
| | | { |
| | | // Send our own TopologyMsg to remote RS |
| | | outTopoMsg = replicationServerDomain.createTopologyMsgForRS(); |
| | | session.publish(outTopoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | } |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE SENT:\n" + outTopoMsg.toString() + |
| | | "\nAND RECEIVED:\n" + inTopoMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | // Alright, connected with new RS (either outgoing or incoming |
| | | // connection): store handler. |
| | | Map<Short, ServerHandler> connectedRSs = |
| | | replicationServerDomain.getConnectedRSs(); |
| | | connectedRSs.put(serverId, this); |
| | | |
| | | // Process TopologyMsg sent by remote RS: store matching new info |
| | | // (this will also warn our connected DSs of the new received info) |
| | | replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false); |
| | | |
| | | } else if (msg2 instanceof StartSessionMsg) |
| | | { |
| | | // CONNECTION WITH A DS |
| | | |
| | | // Process StartSessionMsg sent by remote DS |
| | | StartSessionMsg startSessionMsg = (StartSessionMsg) msg2; |
| | | |
| | | this.status = startSessionMsg.getStatus(); |
| | | // Sanity check: is it a valid initial status? |
| | | if (!isValidInitialStatus(this.status)) |
| | | { |
| | | Message mesg = ERR_RS_INVALID_INIT_STATUS.get( |
| | | this.status.toString(), this.baseDn.toString(), |
| | | Short.toString(serverId)); |
| | | closeSession(mesg); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | this.refUrls = startSessionMsg.getReferralsURLs(); |
| | | this.assuredFlag = startSessionMsg.isAssured(); |
| | | this.assuredMode = startSessionMsg.getAssuredMode(); |
| | | this.safeDataLevel = startSessionMsg.getSafeDataLevel(); |
| | | |
| | | /* |
| | | * If we have already a generationID set for the domain |
| | | * then |
| | | * if the connecting replica has not the same |
| | | * then it is degraded locally and notified by an error message |
| | | * else |
| | | * we set the generationID from the one received |
| | | * (unsaved yet on disk . will be set with the 1rst change |
| | | * received) |
| | | */ |
| | | if (localGenerationId > 0) |
| | | { |
| | | if (generationId != localGenerationId) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } |
| | | } else |
| | | { |
| | | // We are an empty Replicationserver |
| | | if ((generationId > 0) && (!serverState.isEmpty())) |
| | | { |
| | | // If the LDAP server has already sent changes |
| | | // it is not expected to connect to an empty RS |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | // WARNING: Must be done before computing topo message to send |
| | | // to peer server as topo message must embed valid generation id |
| | | // for our server |
| | | oldGenerationId = |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | |
| | | // Send our own TopologyMsg to DS |
| | | outTopoMsg = replicationServerDomain.createTopologyMsgForDS( |
| | | this.serverId); |
| | | session.publish(outTopoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + startSessionMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | } |
| | | |
| | | // Alright, connected with new DS: store handler. |
| | | Map<Short, ServerHandler> connectedDSs = |
| | | replicationServerDomain.getConnectedDSs(); |
| | | connectedDSs.put(serverId, this); |
| | | |
| | | // Tell peer DSs a new DS just connected to us |
| | | // No need to resend topo msg to this just new DS so not null |
| | | // argument |
| | | replicationServerDomain.sendTopoInfoToDSs(this); |
| | | // Tell peer RSs a new DS just connected to us |
| | | replicationServerDomain.sendTopoInfoToRSs(); |
| | | } else |
| | | { |
| | | // We did not recognize the message, close session as what |
| | | // can happen after is undetermined and we do not want the server to |
| | | // be disturbed |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | } else |
| | | { |
| | | // Terminate connection from a V1 RS |
| | | |
| | | // if the remote RS and the local RS have the same genID |
| | | // then it's ok and nothing else to do |
| | | if (generationId == localGenerationId) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS V1 with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | } |
| | | } else |
| | | { |
| | | if (localGenerationId > 0) |
| | | { |
| | | // if the local RS is initialized |
| | | if (generationId > 0) |
| | | { |
| | | // if the remote RS is initialized |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationServerDomain.getGenerationIdSavedStatus()) |
| | | { |
| | | // if the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } else |
| | | { |
| | | // The present RS has never received changes regarding its |
| | | // gen ID. |
| | | // |
| | | // Example case: |
| | | // - we are in RS1 |
| | | // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) |
| | | // - RS1 has genId1 from LS1 /genId1 comes from data in |
| | | // suffix |
| | | // - we are in RS1 and we receive a START msg from RS2 |
| | | // - Each RS keeps its genID / is degraded and when LS2 |
| | | // will be populated from LS1 everything will become ok. |
| | | // |
| | | // Issue: |
| | | // FIXME : Would it be a good idea in some cases to just |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non null state and |
| | | // we have a nul state ? |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | // The remote RS has no genId. We don't change anything for the |
| | | // current RS. |
| | | } |
| | | } else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | oldGenerationId = |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | |
| | | // Alright, connected with new incoming V1 RS: store handler. |
| | | Map<Short, ServerHandler> connectedRSs = |
| | | replicationServerDomain.getConnectedRSs(); |
| | | connectedRSs.put(serverId, this); |
| | | |
| | | // Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1 |
| | | // all the servers of the topology. We prefer not not send a TopologyMsg |
| | | // for giving partial/false information to the V2 servers as for |
| | | // instance we don't have the connected DS of the V1 RS...When the V1 |
| | | // RS will be upgraded in his turn, topo info will be sent and accurate. |
| | | // That way, there is no risk to have false/incomplete information in |
| | | // other servers. |
| | | } |
| | | |
| | | /* |
| | | * FINALIZE INITIALIZATION: |
| | | * CREATE READER AND WRITER, HEARTBEAT SYSTEM AND UPDATE MONITORING |
| | | * SYSTEM |
| | | */ |
| | | |
| | | // Disable timeout for next communications |
| | | session.setSoTimeout(0); |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | this, replicationServerDomain); |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | this, replicationServerDomain); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatThread = new HeartbeatThread( |
| | | "Replication Heartbeat to DS " + serverURL + " " + serverId + |
| | | " for " + this.baseDn + " in RS " + replicationServerId, |
| | | session, heartbeatInterval / 3); |
| | | "Replication Heartbeat to " + this + |
| | | " in RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName(), |
| | | session, heartbeatInterval / 3); |
| | | heartbeatThread.start(); |
| | | } |
| | | |
| | | // Create the status analyzer for the domain if not already started |
| | | if (serverIsLDAPserver) |
| | | { |
| | | if (!replicationServerDomain.isRunningStatusAnalyzer()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " is starting status analyzer"); |
| | | replicationServerDomain.startStatusAnalyzer(); |
| | | } |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } catch (NotSupportedOldVersionPDUException e) |
| | | { |
| | | // We do not need to support DS V1 connection, we just accept RS V1 |
| | | // connection: |
| | | // We just trash the message, log the event for debug purpose and close |
| | | // the connection |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ":" |
| | | + e.getMessage()); |
| | | closeSession(null); |
| | | } catch (Exception e) |
| | | { |
| | | // We do not want polluting error log if error is due to normal session |
| | | // aborted after handshake phase one from a DS that is searching for best |
| | | // suitable RS. |
| | | if ( log_error_message || (baseDn != null) ) |
| | | { |
| | | // some problem happened, reject the connection |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_REPLICATION_SERVER_CONNECTION_ERROR.get( |
| | | this.getMonitorInstanceName())); |
| | | mb.append(": " + stackTraceToSingleLineString(e)); |
| | | closeSession(mb.toMessage()); |
| | | } else |
| | | { |
| | | closeSession(null); |
| | | } |
| | | |
| | | // If generation id of domain was changed, set it back to old value |
| | | // We may have changed it as it was -1 and we received a value >0 from |
| | | // peer server and the last topo message sent may have failed being |
| | | // sent: in that case retrieve old value of generation id for |
| | | // replication server domain |
| | | if (oldGenerationId != -100) |
| | | { |
| | | replicationServerDomain.setGenerationId(oldGenerationId, false); |
| | | } |
| | | } |
| | | |
| | | // Release domain |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | } |
| | | |
| | | /* |
| | | * Close the session logging the passed error message |
| | | * Log nothing if message is null. |
| | | */ |
| | | private void closeSession(Message msg) |
| | | { |
| | | if (msg != null) |
| | | { |
| | | logError(msg); |
| | | } |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException ee) |
| | | { |
| | | // ignore |
| | | } |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | | |
| | | /** |
| | | * get the Server Id. |
| | | * Sends a message containing a generationId to a peer server. |
| | | * The peer is expected to be a replication server. |
| | | * |
| | | * @return the ID of the server to which this object is linked |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the Address URL for this server handler. |
| | | * @param msg The GenerationIdMessage message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | * @return The Address URL for this server handler, |
| | | * in the form of an IP address and port separated by a colon. |
| | | */ |
| | | public String getServerAddressURL() |
| | | public void forwardGenerationIdToRS(ResetGenerationIdMsg msg) |
| | | throws IOException |
| | | { |
| | | return serverAddressURL; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the URL for this server handler. |
| | | * |
| | | * @return The URL for this server handler, in the form of an address and |
| | | * port separated by a colon. |
| | | */ |
| | | public String getServerURL() |
| | | { |
| | | return serverURL; |
| | | } |
| | | |
| | | /** |
| | | * Increase the counter of updates sent to the server. |
| | | */ |
| | | public void incrementOutCount() |
| | | { |
| | | outCount++; |
| | | } |
| | | |
| | | /** |
| | | * Increase the counter of update received from the server. |
| | | */ |
| | | public void incrementInCount() |
| | | { |
| | | inCount++; |
| | | } |
| | | |
| | | /** |
| | | * Get the count of updates received from the server. |
| | | * @return the count of update received from the server. |
| | | */ |
| | | public int getInCount() |
| | | { |
| | | return inCount; |
| | | } |
| | | |
| | | /** |
| | | * Get the count of updates sent to this server. |
| | | * @return The count of update sent to this server. |
| | | */ |
| | | public int getOutCount() |
| | | { |
| | | return outCount; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe read |
| | | * mode. |
| | | * @return The number of updates received from the server in assured safe read |
| | | * mode |
| | | */ |
| | | public int getAssuredSrReceivedUpdates() |
| | | { |
| | | return assuredSrReceivedUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe read |
| | | * mode that timed out. |
| | | * @return The number of updates received from the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSrReceivedUpdatesTimeout() |
| | | { |
| | | return assuredSrReceivedUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe read mode. |
| | | * @return The number of updates sent to the server in assured safe read mode |
| | | */ |
| | | public int getAssuredSrSentUpdates() |
| | | { |
| | | return assuredSrSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe read mode that |
| | | * timed out. |
| | | * @return The number of updates sent to the server in assured safe read mode |
| | | * that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSrSentUpdatesTimeout() |
| | | { |
| | | return assuredSrSentUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe data |
| | | * mode. |
| | | * @return The number of updates received from the server in assured safe data |
| | | * mode |
| | | */ |
| | | public int getAssuredSdReceivedUpdates() |
| | | { |
| | | return assuredSdReceivedUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe data |
| | | * mode that timed out. |
| | | * @return The number of updates received from the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSdReceivedUpdatesTimeout() |
| | | { |
| | | return assuredSdReceivedUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe data mode. |
| | | * @return The number of updates sent to the server in assured safe data mode |
| | | */ |
| | | public int getAssuredSdSentUpdates() |
| | | { |
| | | return assuredSdSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe data mode that |
| | | * timed out. |
| | | * @return The number of updates sent to the server in assured safe data mode |
| | | * that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSdSentUpdatesTimeout() |
| | | { |
| | | return assuredSdSentUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * read mode. |
| | | */ |
| | | public void incrementAssuredSrReceivedUpdates() |
| | | { |
| | | assuredSrReceivedUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * read mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrReceivedUpdatesTimeout() |
| | | { |
| | | assuredSrReceivedUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode. |
| | | */ |
| | | public void incrementAssuredSrSentUpdates() |
| | | { |
| | | assuredSrSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrSentUpdatesTimeout() |
| | | { |
| | | assuredSrSentUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * data mode. |
| | | */ |
| | | public void incrementAssuredSdReceivedUpdates() |
| | | { |
| | | assuredSdReceivedUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * data mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdReceivedUpdatesTimeout() |
| | | { |
| | | assuredSdReceivedUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode. |
| | | */ |
| | | public void incrementAssuredSdSentUpdates() |
| | | { |
| | | assuredSdSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdSentUpdatesTimeout() |
| | | { |
| | | assuredSdSentUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Check is this server is saturated (this server has already been |
| | | * sent a bunch of updates and has not processed them so they are staying |
| | | * in the message queue for this server an the size of the queue |
| | | * for this server is above the configured limit. |
| | | * |
| | | * The limit can be defined in number of updates or with a maximum delay |
| | | * |
| | | * @param changeNumber The changenumber to use to make the delay calculations. |
| | | * @param sourceHandler The ServerHandler which is sending the update. |
| | | * @return true is saturated false if not saturated. |
| | | */ |
| | | public boolean isSaturated(ChangeNumber changeNumber, |
| | | ServerHandler sourceHandler) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.count(); |
| | | |
| | | if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendQueue > 0) && |
| | | (size >= sourceHandler.maxSendQueue)) |
| | | return true; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | |
| | | if (firstUpdate != null) |
| | | { |
| | | long timeDiff = changeNumber.getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendDelay > 0) && |
| | | (timeDiff >= sourceHandler.maxSendDelay)) |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check that the size of the Server Handler messages Queue has lowered |
| | | * below the limit and therefore allowing the reception of messages |
| | | * from other servers to restart. |
| | | * @param source The ServerHandler which was sending the update. |
| | | * can be null. |
| | | * @return true if the processing can restart |
| | | */ |
| | | public boolean restartAfterSaturation(ServerHandler source) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int queueSize = msgQueue.count(); |
| | | if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendQueue > 0) && |
| | | (queueSize >= source.restartSendQueue)) |
| | | return false; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | UpdateMsg lastUpdate = msgQueue.last(); |
| | | |
| | | if ((firstUpdate != null) && (lastUpdate != null)) |
| | | { |
| | | long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >= |
| | | source.restartSendDelay)) |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Check if the server associated to this ServerHandler is a replication |
| | | * server. |
| | | * @return true if the server associated to this ServerHandler is a |
| | | * replication server. |
| | | */ |
| | | public boolean isReplicationServer() |
| | | { |
| | | return (!serverIsLDAPserver); |
| | | } |
| | | |
| | | /** |
| | | * Get the number of message in the receive message queue. |
| | | * @return Size of the receive message queue. |
| | | */ |
| | | public int getRcvMsgQueueSize() |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | | /** |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of the receive queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | ServerState dbState = replicationServerDomain.getDbServerState(); |
| | | return ServerState.diffChanges(dbState, serverState); |
| | | } |
| | | } |
| | | session.publish(msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the older update time for that server. |
| | | * @return The older update time. |
| | | * Get the number of updates received from the server in assured safe data |
| | | * mode. |
| | | * @return The number of updates received from the server in assured safe data |
| | | * mode |
| | | */ |
| | | public long getOlderUpdateTime() |
| | | public int getAssuredSdReceivedUpdates() |
| | | { |
| | | ChangeNumber olderUpdateCN = getOlderUpdateCN(); |
| | | if (olderUpdateCN == null) |
| | | return 0; |
| | | return olderUpdateCN.getTime(); |
| | | return assuredSdReceivedUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the older Change Number for that server. |
| | | * Returns null when the queue is empty. |
| | | * @return The older change number. |
| | | * Get the number of updates received from the server in assured safe data |
| | | * mode that timed out. |
| | | * @return The number of updates received from the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public ChangeNumber getOlderUpdateCN() |
| | | public AtomicInteger getAssuredSdReceivedUpdatesTimeout() |
| | | { |
| | | ChangeNumber result = null; |
| | | synchronized (msgQueue) |
| | | { |
| | | if (isFollowing()) |
| | | { |
| | | if (msgQueue.isEmpty()) |
| | | { |
| | | result = null; |
| | | } else |
| | | { |
| | | UpdateMsg msg = msgQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } else |
| | | { |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | // isFollowing is false AND lateQueue is empty |
| | | // We may be at the very moment when the writer has emptyed the |
| | | // lateQueue when it sent the last update. The writer will fill again |
| | | // the lateQueue when it will send the next update but we are not yet |
| | | // there. So let's take the last change not sent directly from |
| | | // the db. |
| | | |
| | | ReplicationIteratorComparator comparator = |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | try |
| | | { |
| | | // Build a list of candidates iterator (i.e. db i.e. server) |
| | | for (short serverId : replicationServerDomain.getServers()) |
| | | { |
| | | // get the last already sent CN from that server |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | // get an iterator in this server db from that last change |
| | | ReplicationIterator iterator = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | // if that iterator has changes, then it is a candidate |
| | | // it is added in the sorted list at a position given by its |
| | | // current change (see ReplicationIteratorComparator). |
| | | if ((iterator != null) && (iterator.getChange() != null)) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | } |
| | | UpdateMsg msg = iteratorSortedSet.first().getChange(); |
| | | result = msg.getChangeNumber(); |
| | | } catch (Exception e) |
| | | { |
| | | result = null; |
| | | } finally |
| | | { |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | UpdateMsg msg = lateQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | return assuredSdReceivedUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Check if the LDAP server can follow the speed of the other servers. |
| | | * @return true when the server has all the not yet sent changes |
| | | * in its queue. |
| | | * Get the number of updates sent to the server in assured safe data mode. |
| | | * @return The number of updates sent to the server in assured safe data mode |
| | | */ |
| | | public boolean isFollowing() |
| | | public int getAssuredSdSentUpdates() |
| | | { |
| | | return following; |
| | | return assuredSdSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Set the following flag of this server. |
| | | * @param following the value that should be set. |
| | | * Get the number of updates sent to the server in assured safe data mode that |
| | | * timed out. |
| | | * @return The number of updates sent to the server in assured safe data mode |
| | | * that timed out. |
| | | */ |
| | | public void setFollowing(boolean following) |
| | | public AtomicInteger getAssuredSdSentUpdatesTimeout() |
| | | { |
| | | this.following = following; |
| | | return assuredSdSentUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Add an update to the list of updates that must be sent to the server |
| | | * managed by this ServerHandler. |
| | | * Get the number of updates received from the server in assured safe read |
| | | * mode. |
| | | * @return The number of updates received from the server in assured safe read |
| | | * mode |
| | | */ |
| | | public int getAssuredSrReceivedUpdates() |
| | | { |
| | | return assuredSrReceivedUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe read |
| | | * mode that timed out. |
| | | * @return The number of updates received from the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSrReceivedUpdatesTimeout() |
| | | { |
| | | return assuredSrReceivedUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe read mode. |
| | | * @return The number of updates sent to the server in assured safe read mode |
| | | */ |
| | | public int getAssuredSrSentUpdates() |
| | | { |
| | | return assuredSrSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe read mode that |
| | | * timed out. |
| | | * @return The number of updates sent to the server in assured safe read mode |
| | | * that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSrSentUpdatesTimeout() |
| | | { |
| | | return assuredSrSentUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Returns the Replication Server Domain to which belongs this server handler. |
| | | * |
| | | * @param update The update that must be added to the list of updates. |
| | | * @param sourceHandler The server that sent the update. |
| | | * @return The replication server domain. |
| | | */ |
| | | public void add(UpdateMsg update, ServerHandler sourceHandler) |
| | | public ReplicationServerDomain getDomain() |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * If queue was empty the writer thread was probably asleep |
| | | * waiting for some changes, wake it up |
| | | */ |
| | | if (msgQueue.isEmpty()) |
| | | msgQueue.notify(); |
| | | |
| | | msgQueue.add(update); |
| | | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | while ((msgQueue.count() > maxQueueSize) || |
| | | (msgQueue.bytesCount() > maxQueueBytesSize)) |
| | | { |
| | | setFollowing(false); |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | if (isSaturated(update.getChangeNumber(), sourceHandler)) |
| | | { |
| | | sourceHandler.setSaturated(true); |
| | | } |
| | | |
| | | } |
| | | |
| | | private void setSaturated(boolean value) |
| | | { |
| | | flowControl = value; |
| | | return this.replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | | * Select the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | * |
| | | * @return the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | * Returns the value of generationId for that handler. |
| | | * @return The value of the generationId. |
| | | */ |
| | | public UpdateMsg take() |
| | | public long getGenerationId() |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMsg msg = getnextMessage(); |
| | | |
| | | /* |
| | | * When we remove a message from the queue we need to check if another |
| | | * server is waiting in flow control because this queue was too long. |
| | | * This check might cause a performance penalty an therefore it |
| | | * is not done for every message removed but only every few messages. |
| | | */ |
| | | if (++saturationCount > 10) |
| | | { |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | replicationServerDomain.checkAllSaturation(); |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | } |
| | | boolean acquired = false; |
| | | do |
| | | { |
| | | try |
| | | { |
| | | acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (((interrupted) || (!acquired)) && (!shutdownWriter)); |
| | | if (msg != null) |
| | | { |
| | | incrementOutCount(); |
| | | if (msg.isAssured()) |
| | | { |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | incrementAssuredSrSentUpdates(); |
| | | } else |
| | | { |
| | | if (!isLDAPserver()) |
| | | incrementAssuredSdSentUpdates(); |
| | | } |
| | | } |
| | | } |
| | | return msg; |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the next update that must be sent to the server |
| | | * from the message queue or from the database. |
| | | * |
| | | * @return The next update that must be sent to the server. |
| | | * Gets the group id of the server represented by this object. |
| | | * @return The group id of the server represented by this object. |
| | | */ |
| | | private UpdateMsg getnextMessage() |
| | | public byte getGroupId() |
| | | { |
| | | UpdateMsg msg; |
| | | while (activeWriter == true) |
| | | { |
| | | if (following == false) |
| | | { |
| | | /* this server is late with regard to some other masters |
| | | * in the topology or just joined the topology. |
| | | * In such cases, we can't keep all changes in the queue |
| | | * without saturating the memory, we therefore use |
| | | * a lateQueue that is filled with a few changes from the changelogDB |
| | | * If this server is able to close the gap, it will start using again |
| | | * the regular msgQueue later. |
| | | */ |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | /* |
| | | * Start from the server State |
| | | * Loop until the queue high mark or until no more changes |
| | | * for each known LDAP master |
| | | * get the next CSN after this last one : |
| | | * - try to get next from the file |
| | | * - if not found in the file |
| | | * - try to get the next from the queue |
| | | * select the smallest of changes |
| | | * check if it is in the memory tree |
| | | * yes : lock memory tree. |
| | | * check all changes from the list, remove the ones that |
| | | * are already sent |
| | | * unlock memory tree |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | * |
| | | */ |
| | | ReplicationIteratorComparator comparator = |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | /* fill the lateQueue */ |
| | | for (short serverId : replicationServerDomain.getServers()) |
| | | { |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | ReplicationIterator iterator = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | if (iterator != null) |
| | | { |
| | | if (iterator.getChange() != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change across all servers. |
| | | // Hence it is necessary to remove and eventual add again an iterator |
| | | // when looping in order to keep consistent the order of the |
| | | // iterators (see ReplicationIteratorComparator. |
| | | while (!iteratorSortedSet.isEmpty() && |
| | | (lateQueue.count()<100) && |
| | | (lateQueue.bytesCount()<50000) ) |
| | | { |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | if (iterator.next()) |
| | | iteratorSortedSet.add(iterator); |
| | | else |
| | | iterator.releaseCursor(); |
| | | } |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | /* |
| | | * Check if the first change in the lateQueue is also on the regular |
| | | * queue |
| | | */ |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | if ((msgQueue.count() < maxQueueSize) && |
| | | (msgQueue.bytesCount() < maxQueueBytesSize)) |
| | | { |
| | | setFollowing(true); |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | msg = lateQueue.first(); |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.contains(msg)) |
| | | { |
| | | /* we finally catch up with the regular queue */ |
| | | setFollowing(true); |
| | | lateQueue.clear(); |
| | | UpdateMsg msg1; |
| | | do |
| | | { |
| | | msg1 = msgQueue.removeFirst(); |
| | | } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); |
| | | this.updateServerState(msg); |
| | | return msg; |
| | | } |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | /* get the next change from the lateQueue */ |
| | | msg = lateQueue.removeFirst(); |
| | | this.updateServerState(msg); |
| | | return msg; |
| | | } |
| | | } |
| | | synchronized (msgQueue) |
| | | { |
| | | if (following == true) |
| | | { |
| | | try |
| | | { |
| | | while (msgQueue.isEmpty() && (following == true)) |
| | | { |
| | | msgQueue.wait(500); |
| | | if (!activeWriter) |
| | | return null; |
| | | } |
| | | } catch (InterruptedException e) |
| | | { |
| | | return null; |
| | | } |
| | | if (following == true) |
| | | { |
| | | msg = msgQueue.removeFirst(); |
| | | if (this.updateServerState(msg)) |
| | | { |
| | | /* |
| | | * Only push the message if it has not yet been seen |
| | | * by the other server. |
| | | * Otherwise just loop to select the next message. |
| | | */ |
| | | return msg; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | /* |
| | | * Need to loop because following flag may have gone to false between |
| | | * the first check at the beginning of this method |
| | | * and the second check just above. |
| | | */ |
| | | } |
| | | return null; |
| | | return groupId; |
| | | } |
| | | |
| | | /** |
| | | * Update the serverState with the last message sent. |
| | | * |
| | | * @param msg the last update sent. |
| | | * @return boolean indicating if the update was meaningful. |
| | | * Get our heartbeat interval. |
| | | * @return Our heartbeat interval. |
| | | */ |
| | | public boolean updateServerState(UpdateMsg msg) |
| | | public long getHeartbeatInterval() |
| | | { |
| | | return serverState.update(msg.getChangeNumber()); |
| | | return heartbeatInterval; |
| | | } |
| | | |
| | | /** |
| | | * Get the state of this server. |
| | | * |
| | | * @return ServerState the state for this server.. |
| | | * Get the count of updates received from the server. |
| | | * @return the count of update received from the server. |
| | | */ |
| | | public ServerState getServerState() |
| | | public int getInCount() |
| | | { |
| | | return serverState; |
| | | } |
| | | |
| | | /** |
| | | * Sends an ack message to the server represented by this object. |
| | | * |
| | | * @param ack The ack message to be sent. |
| | | * @throws IOException In case of Exception thrown sending the ack. |
| | | */ |
| | | public void sendAck(AckMsg ack) throws IOException |
| | | { |
| | | session.publish(ack); |
| | | } |
| | | |
| | | /** |
| | | * Check type of server handled. |
| | | * |
| | | * @return true if the handled server is an LDAP server. |
| | | * false if the handled server is a replicationServer |
| | | */ |
| | | public boolean isLDAPserver() |
| | | { |
| | | return serverIsLDAPserver; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException, InitializationException |
| | | { |
| | | // Nothing to do for now |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the name of this monitor provider. It should be unique among all |
| | | * monitor providers, including all instances of the same monitor provider. |
| | | * |
| | | * @return The name of this monitor provider. |
| | | */ |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | String str = serverURL + " " + String.valueOf(serverId); |
| | | |
| | | if (serverIsLDAPserver) |
| | | return "Connected Replica " + str + |
| | | ",cn=" + replicationServerDomain.getMonitorInstanceName(); |
| | | else |
| | | return "Connected Replication Server " + str + |
| | | ",cn=" + replicationServerDomain.getMonitorInstanceName(); |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the length of time in milliseconds that should elapse between |
| | | * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero |
| | | * return value indicates that the <CODE>updateMonitorData()</CODE> method |
| | | * should not be periodically invoked. |
| | | * |
| | | * @return The length of time in milliseconds that should elapse between |
| | | * calls to the <CODE>updateMonitorData()</CODE> method. |
| | | */ |
| | | @Override |
| | | public long getUpdateInterval() |
| | | { |
| | | /* we don't wont to do polling on this monitor */ |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Performs any processing periodic processing that may be desired to update |
| | | * the information associated with this monitor. Note that best-effort |
| | | * attempts will be made to ensure that calls to this method come |
| | | * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will |
| | | * be made. |
| | | */ |
| | | @Override |
| | | public void updateMonitorData() |
| | | { |
| | | // As long as getUpdateInterval() returns 0, this will never get called |
| | | return inCount; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | { |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | if (serverIsLDAPserver) |
| | | { |
| | | attributes.add(Attributes.create("replica", serverURL)); |
| | | attributes.add(Attributes.create("connected-to", |
| | | this.replicationServerDomain.getReplicationServer() |
| | | .getMonitorInstanceName())); |
| | | // Get the generic ones |
| | | ArrayList<Attribute> attributes = super.getMonitorData(); |
| | | |
| | | } |
| | | else |
| | | { |
| | | attributes.add(Attributes.create("Replication-Server", |
| | | serverURL)); |
| | | } |
| | | attributes.add(Attributes.create("server-id", String |
| | | .valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", baseDn.toString())); |
| | | |
| | | try |
| | | { |
| | | MonitorData md; |
| | | md = replicationServerDomain.computeMonitorData(); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | | // Oldest missing update |
| | | Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); |
| | | if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0)) |
| | | { |
| | | Date date = new Date(approxFirstMissingDate); |
| | | attributes.add(Attributes.create( |
| | | "approx-older-change-not-synchronized", date.toString())); |
| | | attributes.add(Attributes.create( |
| | | "approx-older-change-not-synchronized-millis", String |
| | | .valueOf(approxFirstMissingDate))); |
| | | } |
| | | |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChanges(serverId); |
| | | attributes.add(Attributes.create("missing-changes", String |
| | | .valueOf(missingChanges))); |
| | | |
| | | // Replication delay |
| | | long delay = md.getApproxDelay(serverId); |
| | | attributes.add(Attributes.create("approximate-delay", String |
| | | .valueOf(delay))); |
| | | |
| | | /* get the Server State */ |
| | | AttributeBuilder builder = new AttributeBuilder("server-state"); |
| | | ServerState state = md.getLDAPServerState(serverId); |
| | | if (state != null) |
| | | { |
| | | for (String str : state.toStringSet()) |
| | | { |
| | | builder.add(str); |
| | | } |
| | | attributes.add(builder.toAttribute()); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChangesRS(serverId); |
| | | attributes.add(Attributes.create("missing-changes", String |
| | | .valueOf(missingChanges))); |
| | | |
| | | /* get the Server State */ |
| | | AttributeBuilder builder = new AttributeBuilder("server-state"); |
| | | ServerState state = md.getRSStates(serverId); |
| | | if (state != null) |
| | | { |
| | | for (String str : state.toStringSet()) |
| | | { |
| | | builder.add(str); |
| | | } |
| | | attributes.add(builder.toAttribute()); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e)); |
| | | // We failed retrieving the monitor data. |
| | | attributes.add(Attributes.create("error", message.toString())); |
| | | } |
| | | |
| | | attributes.add( |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.count()))); |
| | | attributes.add( |
| | | Attributes.create( |
| | | "queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); |
| | | attributes.add( |
| | | Attributes.create( |
| | | "following", String.valueOf(following))); |
| | | attributes.add(Attributes.create("server-id", String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", getServiceId().toString())); |
| | | |
| | | // Deprecated |
| | | attributes.add(Attributes.create("max-waiting-changes", String |
| | |
| | | attributes.add(Attributes.create("assured-sr-received-updates", String |
| | | .valueOf(getAssuredSrReceivedUpdates()))); |
| | | attributes.add(Attributes.create("assured-sr-received-updates-timeout", |
| | | String .valueOf(getAssuredSrReceivedUpdatesTimeout()))); |
| | | String .valueOf(getAssuredSrReceivedUpdatesTimeout()))); |
| | | attributes.add(Attributes.create("assured-sr-sent-updates", String |
| | | .valueOf(getAssuredSrSentUpdates()))); |
| | | attributes.add(Attributes.create("assured-sr-sent-updates-timeout", String |
| | | .valueOf(getAssuredSrSentUpdatesTimeout()))); |
| | | attributes.add(Attributes.create("assured-sd-received-updates", String |
| | | .valueOf(getAssuredSdReceivedUpdates()))); |
| | | if (!isLDAPserver()) |
| | | if (!isDataServer()) |
| | | { |
| | | attributes.add(Attributes.create("assured-sd-sent-updates", |
| | | String.valueOf(getAssuredSdSentUpdates()))); |
| | | String.valueOf(getAssuredSdSentUpdates()))); |
| | | attributes.add(Attributes.create("assured-sd-sent-updates-timeout", |
| | | String.valueOf(getAssuredSdSentUpdatesTimeout()))); |
| | | String.valueOf(getAssuredSdSentUpdatesTimeout()))); |
| | | } else |
| | | { |
| | | attributes.add(Attributes.create("assured-sd-received-updates-timeout", |
| | | String.valueOf(getAssuredSdReceivedUpdatesTimeout()))); |
| | | String.valueOf(getAssuredSdReceivedUpdatesTimeout()))); |
| | | } |
| | | |
| | | // Window stats |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the name of this monitor provider. It should be unique among all |
| | | * monitor providers, including all instances of the same monitor provider. |
| | | * |
| | | * @return The name of this monitor provider. |
| | | */ |
| | | @Override |
| | | public abstract String getMonitorInstanceName(); |
| | | |
| | | /** |
| | | * Get the older update time for that server. |
| | | * @return The older update time. |
| | | */ |
| | | public long getOlderUpdateTime() |
| | | { |
| | | ChangeNumber olderUpdateCN = getOlderUpdateCN(); |
| | | if (olderUpdateCN == null) |
| | | return 0; |
| | | return olderUpdateCN.getTime(); |
| | | } |
| | | |
| | | /** |
| | | * Get the count of updates sent to this server. |
| | | * @return The count of update sent to this server. |
| | | */ |
| | | public int getOutCount() |
| | | { |
| | | return outCount; |
| | | } |
| | | |
| | | /** |
| | | * Gets the protocol version used with this remote server. |
| | | * @return The protocol version used with this remote server. |
| | | */ |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | |
| | | /** |
| | | * get the Server Id. |
| | | * |
| | | * @return the ID of the server to which this object is linked |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the URL for this server handler. |
| | | * |
| | | * @return The URL for this server handler, in the form of an address and |
| | | * port separated by a colon. |
| | | */ |
| | | public String getServerURL() |
| | | { |
| | | return serverURL; |
| | | } |
| | | |
| | | /** |
| | | * Return the ServerStatus. |
| | | * @return The server status. |
| | | */ |
| | | protected abstract ServerStatus getStatus(); |
| | | |
| | | /** |
| | | * Retrieves the length of time in milliseconds that should elapse between |
| | | * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero |
| | | * return value indicates that the <CODE>updateMonitorData()</CODE> method |
| | | * should not be periodically invoked. |
| | | * |
| | | * @return The length of time in milliseconds that should elapse between |
| | | * calls to the <CODE>updateMonitorData()</CODE> method. |
| | | */ |
| | | @Override |
| | | public long getUpdateInterval() |
| | | { |
| | | /* we don't wont to do polling on this monitor */ |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * data mode. |
| | | */ |
| | | public void incrementAssuredSdReceivedUpdates() |
| | | { |
| | | assuredSdReceivedUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * data mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdReceivedUpdatesTimeout() |
| | | { |
| | | assuredSdReceivedUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode. |
| | | */ |
| | | public void incrementAssuredSdSentUpdates() |
| | | { |
| | | assuredSdSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdSentUpdatesTimeout() |
| | | { |
| | | assuredSdSentUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * read mode. |
| | | */ |
| | | public void incrementAssuredSrReceivedUpdates() |
| | | { |
| | | assuredSrReceivedUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * read mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrReceivedUpdatesTimeout() |
| | | { |
| | | assuredSrReceivedUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode. |
| | | */ |
| | | public void incrementAssuredSrSentUpdates() |
| | | { |
| | | assuredSrSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrSentUpdatesTimeout() |
| | | { |
| | | assuredSrSentUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increase the counter of update received from the server. |
| | | */ |
| | | public void incrementInCount() |
| | | { |
| | | inCount++; |
| | | } |
| | | |
| | | /** |
| | | * Increase the counter of updates sent to the server. |
| | | */ |
| | | public void incrementOutCount() |
| | | { |
| | | outCount++; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException, InitializationException |
| | | { |
| | | // Nothing to do for now |
| | | } |
| | | |
| | | /** |
| | | * Check if the server associated to this ServerHandler is a data server |
| | | * in the topology. |
| | | * @return true if the server is a data server. |
| | | */ |
| | | public abstract boolean isDataServer(); |
| | | |
| | | /** |
| | | * Check if the server associated to this ServerHandler is a replication |
| | | * server. |
| | | * @return true if the server is a replication server. |
| | | */ |
| | | public boolean isReplicationServer() |
| | | { |
| | | return (!this.isDataServer()); |
| | | } |
| | | |
| | | /** |
| | | * Lock the domain potentially with a timeout. |
| | | * @param timedout The provided timeout. |
| | | * @throws DirectoryException When an exception occurs. |
| | | */ |
| | | protected void lockDomain(boolean timedout) |
| | | throws DirectoryException |
| | | { |
| | | // The handshake phase must be done by blocking any access to structures |
| | | // keeping info on connected servers, so that one can safely check for |
| | | // pre-existence of a server, send a coherent snapshot of known topology |
| | | // to peers, update the local view of the topology... |
| | | // |
| | | // For instance a kind of problem could be that while we connect with a |
| | | // peer RS, a DS is connecting at the same time and we could publish the |
| | | // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. |
| | | // |
| | | // This method and every others that need to read/make changes to the |
| | | // structures holding topology for the domain should: |
| | | // - call ReplicationServerDomain.lock() |
| | | // - read/modify structures |
| | | // - call ReplicationServerDomain.release() |
| | | // |
| | | // More information is provided in comment of ReplicationServerDomain.lock() |
| | | |
| | | // If domain already exists, lock it until handshake is finished otherwise |
| | | // it will be created and locked later in the method |
| | | try |
| | | { |
| | | if (!timedout) |
| | | { |
| | | // !timedout |
| | | if (!replicationServerDomain.hasLock()) |
| | | replicationServerDomain.lock(); |
| | | } |
| | | else |
| | | { |
| | | // timedout |
| | | /** |
| | | * Take the lock on the domain. |
| | | * WARNING: Here we try to acquire the lock with a timeout. This |
| | | * is for preventing a deadlock that may happen if there are cross |
| | | * connection attempts (for same domain) from this replication |
| | | * server and from a peer one: |
| | | * Here is the scenario: |
| | | * - RS1 connect thread takes the domain lock and starts |
| | | * connection to RS2 |
| | | * - at the same time RS2 connect thread takes his domain lock and |
| | | * start connection to RS2 |
| | | * - RS2 listen thread starts processing received |
| | | * ReplServerStartMsg from RS1 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS2 connect thread already has |
| | | * it |
| | | * - RS1 listen thread starts processing received |
| | | * ReplServerStartMsg from RS2 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS1 connect thread already has |
| | | * it |
| | | * => Deadlock: 4 threads are locked. |
| | | * So to prevent that in such situation, the listen threads here |
| | | * will both timeout trying to acquire the lock. The random time |
| | | * for the timeout should allow on connection attempt to be |
| | | * aborted whereas the other one should have time to finish in the |
| | | * same time. |
| | | * Warning: the minimum time (3s) should be big enough to allow |
| | | * normal situation connections to terminate. The added random |
| | | * time should represent a big enough range so that the chance to |
| | | * have one listen thread timing out a lot before the peer one is |
| | | * great. When the first listen thread times out, the remote |
| | | * connect thread should release the lock and allow the peer |
| | | * listen thread to take the lock it was waiting for and process |
| | | * the connection attempt. |
| | | */ |
| | | Random random = new Random(); |
| | | int randomTime = random.nextInt(6); // Random from 0 to 5 |
| | | // Wait at least 3 seconds + (0 to 5 seconds) |
| | | long timeout = (long) (3000 + ( randomTime * 1000 ) ); |
| | | boolean noTimeout = replicationServerDomain.tryLock(timeout); |
| | | if (!noTimeout) |
| | | { |
| | | // Timeout |
| | | Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get( |
| | | getServiceId(), |
| | | Short.toString(serverId), |
| | | Short.toString(replicationServerId)); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Thread interrupted |
| | | Message message = ERR_EXCEPTION_LOCKING_RS_DOMAIN.get(e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes a routable message. |
| | | * |
| | | * @param msg The message to be processed. |
| | | */ |
| | | public void process(RoutableMsg msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + this + |
| | | " processes received msg:\n" + msg); |
| | | replicationServerDomain.process(msg, this); |
| | | } |
| | | |
| | | /** |
| | | * Process the reception of a WindowProbeMsg message. |
| | | * |
| | | * @param windowProbeMsg The message to process. |
| | | * |
| | | * @throws IOException When the session becomes unavailable. |
| | | */ |
| | | public void process(WindowProbeMsg windowProbeMsg) throws IOException |
| | | { |
| | | if (rcvWindow > 0) |
| | | { |
| | | // The LDAP server believes that its window is closed |
| | | // while it is not, this means that some problem happened in the |
| | | // window exchange procedure ! |
| | | // lets update the LDAP server with out current window size and hope |
| | | // that everything will work better in the futur. |
| | | // TODO also log an error message. |
| | | WindowMsg msg = new WindowMsg(rcvWindow); |
| | | session.publish(msg); |
| | | } else |
| | | { |
| | | // Both the LDAP server and the replication server believes that the |
| | | // window is closed. Lets check the flowcontrol in case we |
| | | // can now resume operations and send a windowMessage if necessary. |
| | | checkWindow(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Send an InitializeRequestMessage to the server connected through this |
| | | * handler. |
| | | * |
| | | * @param msg The message to be processed |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void send(RoutableMsg msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + this + |
| | | " publishes message:\n" + msg); |
| | | session.publish(msg); |
| | | } |
| | | |
| | | /** |
| | | * Sends an ack message to the server represented by this object. |
| | | * |
| | | * @param ack The ack message to be sent. |
| | | * @throws IOException In case of Exception thrown sending the ack. |
| | | */ |
| | | public void sendAck(AckMsg ack) throws IOException |
| | | { |
| | | session.publish(ack); |
| | | } |
| | | |
| | | /** |
| | | * Send an ErrorMsg to the peer. |
| | | * |
| | | * @param errorMsg The message to be sent |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void sendError(ErrorMsg errorMsg) throws IOException |
| | | { |
| | | session.publish(errorMsg); |
| | | } |
| | | |
| | | /** |
| | | * Send the ReplServerStartMsg to the remote server (RS or DS). |
| | | * @param requestedProtocolVersion The provided protocol version. |
| | | * @return The ReplServerStartMsg sent. |
| | | * @throws IOException When an exception occurs. |
| | | */ |
| | | public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion) |
| | | throws IOException |
| | | { |
| | | this.localGenerationId = replicationServerDomain.getGenerationId(); |
| | | ReplServerStartMsg outReplServerStartMsg |
| | | = new ReplServerStartMsg( |
| | | replicationServerId, |
| | | replicationServerURL, |
| | | getServiceId(), |
| | | maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | protocolVersion, |
| | | localGenerationId, |
| | | sslEncryption, |
| | | getLocalGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | |
| | | if (requestedProtocolVersion>0) |
| | | session.publish(outReplServerStartMsg, requestedProtocolVersion); |
| | | else |
| | | session.publish(outReplServerStartMsg); |
| | | |
| | | return outReplServerStartMsg; |
| | | } |
| | | |
| | | /** |
| | | * Sends the provided TopologyMsg to the peer server. |
| | | * |
| | | * @param topoMsg The TopologyMsg message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void sendTopoInfo(TopologyMsg topoMsg) |
| | | throws IOException |
| | | { |
| | | // V1 Rs do not support the TopologyMsg |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | session.publish(topoMsg); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Set a new generation ID. |
| | | * |
| | | * @param generationId The new generation ID |
| | | * |
| | | */ |
| | | public void setGenerationId(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Sets the replication server domain associated. |
| | | * @param rsd The provided replication server domain. |
| | | */ |
| | | protected void setReplicationServerDomain(ReplicationServerDomain rsd) |
| | | { |
| | | this.replicationServerDomain = rsd; |
| | | } |
| | | |
| | | /** |
| | | * Sets the window size when used when sending to the remote. |
| | | * @param size The provided window size. |
| | | */ |
| | | protected void setSendWindowSize(int size) |
| | | { |
| | | this.sendWindowSize = size; |
| | | } |
| | | |
| | | /** |
| | | * Requests to shutdown the writer. |
| | | */ |
| | | protected void shutdownWriter() |
| | | { |
| | | shutdownWriter = true; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown This ServerHandler. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | /* |
| | | * Shutdown ServerWriter |
| | | */ |
| | | shutdownWriter = true; |
| | | activeWriter = false; |
| | | synchronized (msgQueue) |
| | | { |
| | | /* wake up the writer thread on an empty queue so that it disappear */ |
| | | msgQueue.clear(); |
| | | msgQueue.notify(); |
| | | msgQueue.notifyAll(); |
| | | } |
| | | shutdownWriter(); |
| | | setConsumerActive(false); |
| | | super.shutdown(); |
| | | |
| | | /* |
| | | * Close session to end ServerReader or ServerWriter |
| | | */ |
| | | try |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore. |
| | | } |
| | | |
| | | /* |
| | | * Stop the remote LSHandler |
| | | */ |
| | | synchronized (directoryServers) |
| | | { |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | // Close session to end ServerReader or ServerWriter |
| | | try |
| | | { |
| | | lsh.stopHandler(); |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore. |
| | | } |
| | | directoryServers.clear(); |
| | | } |
| | | |
| | | /* |
| | |
| | | { |
| | | // don't try anymore to join and return. |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("SH.shutdowned(" + this + ")"); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String localString; |
| | | if (serverId != 0) |
| | | { |
| | | if (serverIsLDAPserver) |
| | | localString = "Directory Server "; |
| | | else |
| | | localString = "Replication Server "; |
| | | |
| | | |
| | | localString += serverId + " " + serverURL + " " + baseDn; |
| | | } else |
| | | localString = "Unknown server"; |
| | | |
| | | return localString; |
| | | } |
| | | |
| | | /** |
| | | * Decrement the protocol window, then check if it is necessary |
| | | * to send a WindowMsg and send it. |
| | | * Select the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | * @return the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | */ |
| | | public synchronized void decAndCheckWindow() throws IOException |
| | | public UpdateMsg take() |
| | | { |
| | | rcvWindow--; |
| | | checkWindow(); |
| | | } |
| | | boolean interrupted = true; |
| | | UpdateMsg msg = getnextMessage(true); // synchronous:block until msg |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMsg if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void checkWindow() throws IOException |
| | | { |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | /* |
| | | * When we remove a message from the queue we need to check if another |
| | | * server is waiting in flow control because this queue was too long. |
| | | * This check might cause a performance penalty an therefore it |
| | | * is not done for every message removed but only every few messages. |
| | | */ |
| | | if (++saturationCount > 10) |
| | | { |
| | | if (flowControl) |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | if (replicationServerDomain.restartAfterSaturation(this)) |
| | | { |
| | | flowControl = false; |
| | | } |
| | | } |
| | | if (!flowControl) |
| | | replicationServerDomain.checkAllSaturation(); |
| | | } catch (IOException e) |
| | | { |
| | | WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | boolean acquired = false; |
| | | do |
| | | { |
| | | try |
| | | { |
| | | acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (((interrupted) || (!acquired)) && (!shutdownWriter)); |
| | | if (msg != null) |
| | | { |
| | | incrementOutCount(); |
| | | |
| | | if (msg.isAssured()) |
| | | { |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | incrementAssuredSrSentUpdates(); |
| | | } else |
| | | { |
| | | if (!isDataServer()) |
| | | incrementAssuredSdSentUpdates(); |
| | | } |
| | | } |
| | | } |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * Creates a RSInfo structure representing this remote RS. |
| | | * @return The RSInfo structure representing this remote RS |
| | | */ |
| | | public RSInfo toRSInfo() |
| | | { |
| | | RSInfo rsInfo = new RSInfo(serverId, generationId, groupId); |
| | | |
| | | return rsInfo; |
| | | } |
| | | |
| | | /** |
| | | * Performs any processing periodic processing that may be desired to update |
| | | * the information associated with this monitor. Note that best-effort |
| | | * attempts will be made to ensure that calls to this method come |
| | | * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will |
| | | * be made. |
| | | */ |
| | | @Override |
| | | public void updateMonitorData() |
| | | { |
| | | // As long as getUpdateInterval() returns 0, this will never get called |
| | | } |
| | | /** |
| | | * Update the send window size based on the credit specified in the |
| | | * given window message. |
| | | * |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get our heartbeat interval. |
| | | * @return Our heartbeat interval. |
| | | * Log the messages involved in the start handshake. |
| | | * @param inStartMsg The message received first. |
| | | * @param outStartMsg The message sent in response. |
| | | */ |
| | | public long getHeartbeatInterval() |
| | | { |
| | | return heartbeatInterval; |
| | | } |
| | | |
| | | /** |
| | | * Processes a routable message. |
| | | * |
| | | * @param msg The message to be processed. |
| | | */ |
| | | public void process(RoutableMsg msg) |
| | | protected void logStartHandshakeRCVandSND( |
| | | StartMsg inStartMsg, |
| | | StartMsg outStartMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + ":" + |
| | | "\nprocesses received msg:\n" + msg); |
| | | replicationServerDomain.process(msg, this); |
| | | } |
| | | |
| | | /** |
| | | * Sends the provided TopologyMsg to the peer server. |
| | | * |
| | | * @param topoMsg The TopologyMsg message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void sendTopoInfo(TopologyMsg topoMsg) |
| | | throws IOException |
| | | { |
| | | // V1 Rs do not support the TopologyMsg |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + ":" + |
| | | "\nsends message:\n" + topoMsg); |
| | | |
| | | session.publish(topoMsg); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stores topology information received from a peer RS and that must be kept |
| | | * in RS handler. |
| | | * |
| | | * @param topoMsg The received topology message |
| | | */ |
| | | public void receiveTopoInfoFromRS(TopologyMsg topoMsg) |
| | | { |
| | | // Store info for remote RS |
| | | List<RSInfo> rsInfos = topoMsg.getRsList(); |
| | | // List should only contain RS info for sender |
| | | RSInfo rsInfo = rsInfos.get(0); |
| | | generationId = rsInfo.getGenerationId(); |
| | | groupId = rsInfo.getGroupId(); |
| | | |
| | | /** |
| | | * Store info for DSs connected to the peer RS |
| | | */ |
| | | List<DSInfo> dsInfos = topoMsg.getDsList(); |
| | | |
| | | synchronized (directoryServers) |
| | | { |
| | | // Removes the existing structures |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | directoryServers.clear(); |
| | | |
| | | // Creates the new structure according to the message received. |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | LightweightServerHandler lsh = new LightweightServerHandler(this, |
| | | serverId, dsInfo.getDsId(), dsInfo.getGenerationId(), |
| | | dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(), |
| | | dsInfo.isAssured(), dsInfo.getAssuredMode(), |
| | | dsInfo.getSafeDataLevel()); |
| | | lsh.startHandler(); |
| | | directoryServers.put(lsh.getServerId(), lsh); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process message of a remote server changing his status. |
| | | * @param csMsg The message containing the new status |
| | | * @return The new server status of the DS |
| | | */ |
| | | public ServerStatus processNewStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | |
| | | // Sanity check |
| | | if (!serverIsLDAPserver) |
| | | { |
| | | Message msg = |
| | | ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(baseDn.toString(), |
| | | Short.toString(serverId), csMsg.toString()); |
| | | logError(msg); |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | |
| | | // Get the status the DS just entered |
| | | ServerStatus reqStatus = csMsg.getNewStatus(); |
| | | // Translate new status to a state machine event |
| | | StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_RS_INVALID_NEW_STATUS.get(reqStatus.toString(), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(msg); |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | |
| | | // Check state machine allows this new status |
| | | ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | |
| | | status = newStatus; |
| | | |
| | | return status; |
| | | } |
| | | |
| | | /** |
| | | * Change the status according to the event generated from the status |
| | | * analyzer. |
| | | * @param event The event to be used for new status computation |
| | | * @return The new status of the DS |
| | | * @throws IOException When raised by the underlying session |
| | | */ |
| | | public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event) |
| | | throws IOException |
| | | { |
| | | // Check state machine allows this new status (Sanity check) |
| | | ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS |
| | | // and vice versa. We may are being trying to change the status while for |
| | | // instance another status has just been entered: e.g a full update has |
| | | // just been engaged. In that case, just ignore attempt to change the |
| | | // status |
| | | return newStatus; |
| | | } |
| | | |
| | | // Send message requesting to change the DS status |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, |
| | | ServerStatus.INVALID_STATUS); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | " Sending change status from status analyzer to " + getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | | |
| | | status = newStatus; |
| | | |
| | | return newStatus; |
| | | } |
| | | |
| | | /** |
| | | * When this handler is connected to a replication server, specifies if |
| | | * a wanted server is connected to this replication server. |
| | | * |
| | | * @param wantedServer The server we want to know if it is connected |
| | | * to the replication server represented by this handler. |
| | | * @return boolean True is the wanted server is connected to the server |
| | | * represented by this handler. |
| | | */ |
| | | public boolean isRemoteLDAPServer(short wantedServer) |
| | | { |
| | | synchronized (directoryServers) |
| | | { |
| | | for (LightweightServerHandler server : directoryServers.values()) |
| | | { |
| | | if (wantedServer == server.getServerId()) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * When the handler is connected to a replication server, specifies the |
| | | * replication server has remote LDAP servers connected to it. |
| | | * |
| | | * @return boolean True is the replication server has remote LDAP servers |
| | | * connected to it. |
| | | */ |
| | | public boolean hasRemoteLDAPServers() |
| | | { |
| | | synchronized (directoryServers) |
| | | { |
| | | return !directoryServers.isEmpty(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Send an InitializeRequestMessage to the server connected through this |
| | | * handler. |
| | | * |
| | | * @param msg The message to be processed |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void send(RoutableMsg msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + ":" + |
| | | "\nsends message:\n" + msg); |
| | | session.publish(msg); |
| | | } |
| | | |
| | | /** |
| | | * Send an ErrorMsg to the peer. |
| | | * |
| | | * @param errorMsg The message to be sent |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void sendError(ErrorMsg errorMsg) throws IOException |
| | | { |
| | | session.publish(errorMsg); |
| | | } |
| | | |
| | | /** |
| | | * Process the reception of a WindowProbeMsg message. |
| | | * |
| | | * @param windowProbeMsg The message to process. |
| | | * |
| | | * @throws IOException When the session becomes unavailable. |
| | | */ |
| | | public void process(WindowProbeMsg windowProbeMsg) throws IOException |
| | | { |
| | | if (rcvWindow > 0) |
| | | { |
| | | // The LDAP server believes that its window is closed |
| | | // while it is not, this means that some problem happened in the |
| | | // window exchange procedure ! |
| | | // lets update the LDAP server with out current window size and hope |
| | | // that everything will work better in the futur. |
| | | // TODO also log an error message. |
| | | WindowMsg msg = new WindowMsg(rcvWindow); |
| | | session.publish(msg); |
| | | } else |
| | | { |
| | | // Both the LDAP server and the replication server believes that the |
| | | // window is closed. Lets check the flowcontrol in case we |
| | | // can now resume operations and send a windowMessage if necessary. |
| | | checkWindow(); |
| | | getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+ |
| | | "\nAND REPLIED:\n" + outStartMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the value of generationId for that handler. |
| | | * @return The value of the generationId. |
| | | * Log the messages involved in the start handshake. |
| | | * @param outStartMsg The message sent first. |
| | | * @param inStartMsg The message received in response. |
| | | */ |
| | | public long getGenerationId() |
| | | protected void logStartHandshakeSNDandRCV( |
| | | StartMsg outStartMsg, |
| | | StartMsg inStartMsg) |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Sends a message containing a generationId to a peer server. |
| | | * The peer is expected to be a replication server. |
| | | * |
| | | * @param msg The GenerationIdMessage message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void forwardGenerationIdToRS(ResetGenerationIdMsg msg) |
| | | throws IOException |
| | | { |
| | | session.publish(msg); |
| | | } |
| | | |
| | | /** |
| | | * Set a new generation ID. |
| | | * |
| | | * @param generationId The new generation ID |
| | | * |
| | | */ |
| | | public void setGenerationId(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Returns the Replication Server Domain to which belongs this server handler. |
| | | * |
| | | * @return The replication server domain. |
| | | */ |
| | | public ReplicationServerDomain getDomain() |
| | | { |
| | | return this.replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | | * Return a Set containing the servers known by this replicationServer. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | */ |
| | | public Set<Short> getConnectedDirectoryServerIds() |
| | | { |
| | | synchronized (directoryServers) |
| | | { |
| | | return directoryServers.keySet(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Order the peer DS server to change his status or close the connection |
| | | * according to the requested new generation id. |
| | | * @param newGenId The new generation id to take into account |
| | | * @throws IOException If IO error occurred. |
| | | */ |
| | | public void changeStatusForResetGenId(long newGenId) |
| | | throws IOException |
| | | { |
| | | StatusMachineEvent event = null; |
| | | |
| | | if (newGenId == -1) |
| | | { |
| | | // The generation id is being made invalid, let's put the DS |
| | | // into BAD_GEN_ID_STATUS |
| | | event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; |
| | | } else |
| | | { |
| | | if (newGenId == generationId) |
| | | { |
| | | if (status == ServerStatus.BAD_GEN_ID_STATUS) |
| | | { |
| | | // This server has the good new reference generation id. |
| | | // Close connection with him to force his reconnection: DS will |
| | | // reconnect in NORMAL_STATUS or DEGRADED_STATUS. |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | ". Closing connection to DS " + getServerId() + |
| | | " for baseDn " + baseDn + " to force reconnection as new local" + |
| | | " generation id and remote one match and DS is in bad gen id: " + |
| | | newGenId); |
| | | } |
| | | |
| | | // Connection closure must not be done calling RSD.stopHandler() as it |
| | | // would rewait the RSD lock that we already must have entering this |
| | | // method. This would lead to a reentrant lock which we do not want. |
| | | // So simply close the session, this will make the hang up appear |
| | | // after the reader thread that took the RSD lock realeases it. |
| | | try |
| | | { |
| | | if (session != null) |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | | } |
| | | |
| | | // NOT_CONNECTED_STATUS is the last one in RS session life: handler |
| | | // will soon disappear after this method call... |
| | | status = ServerStatus.NOT_CONNECTED_STATUS; |
| | | return; |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | ". DS " + getServerId() + " for baseDn " + baseDn + |
| | | " has already generation id " + newGenId + |
| | | " so no ChangeStatusMsg sent to him."); |
| | | } |
| | | return; |
| | | } |
| | | } else |
| | | { |
| | | // This server has a bad generation id compared to new reference one, |
| | | // let's put it into BAD_GEN_ID_STATUS |
| | | event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; |
| | | } |
| | | } |
| | | |
| | | if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) && |
| | | (status == ServerStatus.FULL_UPDATE_STATUS)) |
| | | { |
| | | // Prevent useless error message (full update status cannot lead to bad |
| | | // gen status) |
| | | Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get( |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | baseDn.toString(), |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(newGenId)); |
| | | logError(message); |
| | | return; |
| | | } |
| | | |
| | | ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Send message requesting to change the DS status |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, |
| | | ServerStatus.INVALID_STATUS); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | " Sending change status for reset gen id to " + getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH START HANDSHAKE SENT("+ this + |
| | | "):\n" + outStartMsg.toString()+ |
| | | "\nAND RECEIVED:\n" + inStartMsg.toString()); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | | |
| | | status = newStatus; |
| | | } |
| | | |
| | | /** |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | | * @return The previous value of the shut down flag |
| | | * Log the messages involved in the Topology handshake. |
| | | * @param inTopoMsg The message received first. |
| | | * @param outTopoMsg The message sent in response. |
| | | */ |
| | | public boolean engageShutdown() |
| | | protected void logTopoHandshakeRCVandSND( |
| | | TopologyMsg inTopoMsg, |
| | | TopologyMsg outTopoMsg) |
| | | { |
| | | // Use thread safe boolean |
| | | return shuttingDown.getAndSet(true); |
| | | } |
| | | |
| | | /** |
| | | * Gets the status of the connected DS. |
| | | * @return The status of the connected DS. |
| | | */ |
| | | public ServerStatus getStatus() |
| | | { |
| | | return status; |
| | | } |
| | | |
| | | /** |
| | | * Gets the protocol version used with this remote server. |
| | | * @return The protocol version used with this remote server. |
| | | */ |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | |
| | | /** |
| | | * Add the DSinfos of the connected Directory Servers |
| | | * to the List of DSInfo provided as a parameter. |
| | | * |
| | | * @param dsInfos The List of DSInfo that should be updated |
| | | * with the DSInfo for the directoryServers |
| | | * connected to this ServerHandler. |
| | | */ |
| | | public void addDSInfos(List<DSInfo> dsInfos) |
| | | { |
| | | synchronized (directoryServers) |
| | | if (debugEnabled()) |
| | | { |
| | | for (LightweightServerHandler ls : directoryServers.values()) |
| | | { |
| | | dsInfos.add(ls.toDSInfo()); |
| | | } |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Gets the group id of the server represented by this object. |
| | | * @return The group id of the server represented by this object. |
| | | * Log the messages involved in the Topology handshake. |
| | | * @param outTopoMsg The message sent first. |
| | | * @param inTopoMsg The message received in response. |
| | | */ |
| | | public byte getGroupId() |
| | | protected void logTopoHandshakeSNDandRCV( |
| | | TopologyMsg outTopoMsg, |
| | | TopologyMsg inTopoMsg) |
| | | { |
| | | return groupId; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() + |
| | | "\nAND RECEIVED:\n" + inTopoMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Log the messages involved in the Topology/StartSession handshake. |
| | | * @param inStartSessionMsg The message received first. |
| | | * @param outTopoMsg The message sent in response. |
| | | */ |
| | | protected void logStartSessionHandshake( |
| | | StartSessionMsg inStartSessionMsg, |
| | | TopologyMsg outTopoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + " :" + |
| | | "\nSH SESSION HANDSHAKE RECEIVED:\n" + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Log the messages involved in the Topology/StartSession handshake. |
| | | * @param inStartECLSessionMsg The message received first. |
| | | */ |
| | | protected void logStartECLSessionHandshake( |
| | | StartECLSessionMsg inStartECLSessionMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + " :" + |
| | | "\nSH SESSION HANDSHAKE RECEIVED:\n" + |
| | | inStartECLSessionMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | } |