From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495, 519
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 3432 +++++++++++++++++-----------------------------------------
1 files changed, 998 insertions(+), 2,434 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a64ea71..63df0d1 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -26,181 +26,217 @@
*/
package org.opends.server.replication.server;
-import org.opends.messages.*;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.StatusMachine.*;
-
-import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import java.io.IOException;
-import java.util.Date;
-import java.util.List;
import java.util.ArrayList;
-import java.util.Map;
import java.util.Random;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
+import org.opends.messages.Message;
import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.HeartbeatThread;
+import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.StartECLSessionMsg;
+import org.opends.server.replication.protocol.StartMsg;
+import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
+import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
/**
- * This class defines a server handler, which handles all interaction with a
- * peer server (RS or DS).
+ * This class defines a server handler :
+ * - that is a MessageHandler (see this class for more details)
+ * - that handles all interaction with a peer server (RS or DS).
*/
-public class ServerHandler extends MonitorProvider<MonitorProviderCfg>
+public abstract class ServerHandler extends MessageHandler
{
-
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
/**
* Time during which the server will wait for existing thread to stop
* during the shutdownWriter.
*/
private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
- /*
- * Properties, filled if remote server is either a DS or a RS
+ /**
+ * Close the session and log the provided error message
+ * Log nothing if message is null.
+ * @param providedSession The provided closing session.
+ * @param providedMsg The provided error message.
+ * @param handler The handler that manages that session.
*/
- private short serverId;
- private ProtocolSession session;
- private final MsgQueue msgQueue = new MsgQueue();
- private MsgQueue lateQueue = new MsgQueue();
- private ReplicationServerDomain replicationServerDomain = null;
- private String serverURL;
+ static protected void closeSession(ProtocolSession providedSession,
+ Message providedMsg, ServerHandler handler)
+ {
+ if (providedMsg != null)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In "
+ + ((handler!=null)?handler.toString():"Replication Server")
+ + " closing session with err=" +
+ providedMsg.toString());
+ logError(providedMsg);
+ }
+ try
+ {
+ if (providedSession!=null)
+ providedSession.close();
+ } catch (IOException ee)
+ {
+ // ignore
+ }
+ }
- // Number of update sent to the server
- private int outCount = 0;
- // Number of updates received from the server
- private int inCount = 0;
+ /**
+ * The serverId of the remote server.
+ */
+ protected short serverId;
+ /**
+ * The session opened with the remote server.
+ */
+ protected ProtocolSession session;
- // Number of updates received from the server in assured safe read mode
- private int assuredSrReceivedUpdates = 0;
- // Number of updates received from the server in assured safe read mode that
- // timed out
- private AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
- // Number of updates sent to the server in assured safe read mode
- private int assuredSrSentUpdates = 0;
- // Number of updates sent to the server in assured safe read mode that timed
- // out
- private AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
- // Number of updates received from the server in assured safe data mode
- private int assuredSdReceivedUpdates = 0;
- // Number of updates received from the server in assured safe data mode that
- // timed out
- private AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
- // Number of updates sent to the server in assured safe data mode
- private int assuredSdSentUpdates = 0;
- // Number of updates sent to the server in assured safe data mode that timed
- // out
- private AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
+ /**
+ * The serverURL of the remote server.
+ */
+ protected String serverURL;
+ /**
+ * Number of updates received from the server in assured safe read mode.
+ */
+ protected int assuredSrReceivedUpdates = 0;
+ /**
+ * Number of updates received from the server in assured safe read mode that
+ * timed out.
+ */
+ protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
+ /**
+ * Number of updates sent to the server in assured safe read mode.
+ */
+ protected int assuredSrSentUpdates = 0;
+ /**
+ * Number of updates sent to the server in assured safe read mode that timed
+ * out.
+ */
+ protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
+ /**
+ // Number of updates received from the server in assured safe data mode.
+ */
+ protected int assuredSdReceivedUpdates = 0;
+ /**
+ * Number of updates received from the server in assured safe data mode that
+ * timed out.
+ */
+ protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
+ /**
+ * Number of updates sent to the server in assured safe data mode.
+ */
+ protected int assuredSdSentUpdates = 0;
- private int maxReceiveQueue = 0;
- private int maxSendQueue = 0;
- private int maxReceiveDelay = 0;
- private int maxSendDelay = 0;
- private int maxQueueSize = 5000;
- private int maxQueueBytesSize = maxQueueSize * 100;
- private int restartReceiveQueue;
- private int restartSendQueue;
- private int restartReceiveDelay;
- private int restartSendDelay;
- private boolean serverIsLDAPserver;
- private boolean following = false;
- private ServerState serverState;
- private boolean activeWriter = true;
- private ServerWriter writer = null;
- private String baseDn = null;
+ /**
+ * Number of updates sent to the server in assured safe data mode that timed
+ * out.
+ */
+ protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
+
+ /**
+ * The associated ServerWriter that sends messages to the remote server.
+ */
+ protected ServerReader reader;
+ /**
+ * The associated ServerReader that receives messages from the remote server.
+ */
+ protected ServerWriter writer = null;
+
+ // window
private int rcvWindow;
private int rcvWindowSizeHalf;
+
private int maxRcvWindow;
- private ServerReader reader;
- private Semaphore sendWindow;
- private int sendWindowSize;
- private boolean flowControl = false; // indicate that the server is
- // flow controlled and should
- // be stopped from sending messages.
+ /**
+ * Semaphore that the writer uses to control the flow to the remote server.
+ */
+ protected Semaphore sendWindow;
+ /**
+ * The initial size of the sending window.
+ */
+ int sendWindowSize;
private int saturationCount = 0;
- private short replicationServerId;
- private short protocolVersion = -1;
- private long generationId = -1;
- // Group id of this remote server
- private byte groupId = (byte) -1;
-
- /*
- * Properties filled only if remote server is a DS
- */
-
- // Status of this DS (only used if this server handler represents a DS)
- private ServerStatus status = ServerStatus.INVALID_STATUS;
- // Referrals URLs this DS is exporting
- private List<String> refUrls = new ArrayList<String>();
- // Assured replication enabled on DS or not
- private boolean assuredFlag = false;
- // DS assured mode (relevant if assured replication enabled)
- private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- // DS safe data level (relevant if assured mode is safe data)
- private byte safeDataLevel = (byte) -1;
-
- /*
- * Properties filled only if remote server is a RS
- */
- private String serverAddressURL;
/**
- * When this Handler is related to a remote replication server
- * this collection will contain as many elements as there are
- * LDAP servers connected to the remote replication server.
+ * The protocol version established with the remote server.
*/
- private final Map<Short, LightweightServerHandler> directoryServers =
- new ConcurrentHashMap<Short, LightweightServerHandler>();
+ protected short protocolVersion = -1;
+ /**
+ * remote generation id.
+ */
+ protected long generationId = -1;
+ /**
+ * The generation id of the hosting RS.
+ */
+ protected long localGenerationId = -1;
+ /**
+ * The generation id before procesing a new start handshake.
+ */
+ protected long oldGenerationId = -1;
+ /**
+ * Group id of this remote server.
+ */
+ protected byte groupId = (byte) -1;
+ /**
+ * The SSL encryption provided by the creator/starter of this handler.
+ */
+ protected boolean initSslEncryption;
+
+ /**
+ * The SSL encryption after the negociation with the peer.
+ */
+ protected boolean sslEncryption;
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
- private long heartbeatInterval = 0;
+ protected long heartbeatInterval = 0;
+
/**
* The thread that will send heartbeats.
*/
HeartbeatThread heartbeatThread = null;
+
/**
* Set when ServerWriter is stopping.
*/
- private boolean shutdownWriter = false;
+ protected boolean shutdownWriter = false;
+
/**
* Set when ServerHandler is stopping.
*/
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
+
/**
* Creates a new server handler instance with the provided socket.
*
@@ -208,846 +244,142 @@
* communicate with the remote entity.
* @param queueSize The maximum number of update that will be kept
* in memory by this ServerHandler.
+ * @param replicationServerURL The URL of the hosting replication server.
+ * @param replicationServerId The serverId of the hosting replication server.
+ * @param replicationServer The hosting replication server.
+ * @param rcvWindowSize The window size to receive from the remote server.
*/
- public ServerHandler(ProtocolSession session, int queueSize)
+ public ServerHandler(
+ ProtocolSession session,
+ int queueSize,
+ String replicationServerURL,
+ short replicationServerId,
+ ReplicationServer replicationServer,
+ int rcvWindowSize)
{
- super("Server Handler");
+ super(queueSize, replicationServerURL,
+ replicationServerId, replicationServer);
this.session = session;
- this.maxQueueSize = queueSize;
- this.maxQueueBytesSize = queueSize * 100;
this.protocolVersion = ProtocolVersion.getCurrentVersion();
+ this.rcvWindowSizeHalf = rcvWindowSize / 2;
+ this.maxRcvWindow = rcvWindowSize;
+ this.rcvWindow = rcvWindowSize;
}
/**
- * Creates a DSInfo structure representing this remote DS.
- * @return The DSInfo structure representing this remote DS
+ * Abort a start procedure currently establishing.
+ * @param reason The provided reason.
*/
- public DSInfo toDSInfo()
+ protected void abortStart(Message reason)
{
- DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
- status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls);
-
- return dsInfo;
- }
-
- /**
- * Creates a RSInfo structure representing this remote RS.
- * @return The RSInfo structure representing this remote RS
- */
- public RSInfo toRSInfo()
- {
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
-
- return rsInfo;
- }
-
- /**
- * Do the handshake with either the DS or RS and then create the reader and
- * writer thread.
- *
- * There are 2 possible handshake sequences: DS<->RS and RS<->RS. Each one are
- * divided into 2 logical consecutive phases (phase 1 and phase 2):
- *
- * DS<->RS (DS (always initiating connection) always sends first message):
- * -------
- *
- * phase 1:
- * DS --- ServerStartMsg ---> RS
- * DS <--- ReplServerStartMsg --- RS
- * phase 2:
- * DS --- StartSessionMsg ---> RS
- * DS <--- TopologyMsg --- RS
- *
- * RS<->RS (RS initiating connection always sends first message):
- * -------
- *
- * phase 1:
- * RS1 --- ReplServerStartMsg ---> RS2
- * RS1 <--- ReplServerStartMsg --- RS2
- * phase 2:
- * RS1 --- TopologyMsg ---> RS2
- * RS1 <--- TopologyMsg --- RS2
- *
- * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
- * null if this is an incoming connection (listen).
- * @param replicationServerId The identifier of the replicationServer that
- * creates this server handler.
- * @param replicationServerURL The URL of the replicationServer that creates
- * this server handler.
- * @param windowSize the window size that this server handler must use.
- * @param sslEncryption For outgoing connections indicates whether encryption
- * should be used after the exchange of start messages.
- * Ignored for incoming connections.
- * @param replicationServer the ReplicationServer that created this server
- * handler.
- */
- public void start(String baseDn, short replicationServerId,
- String replicationServerURL,
- int windowSize, boolean sslEncryption,
- ReplicationServer replicationServer)
- {
-
- // The handshake phase must be done by blocking any access to structures
- // keeping info on connected servers, so that one can safely check for
- // pre-existence of a server, send a coherent snapshot of known topology
- // to peers, update the local view of the topology...
- //
- // For instance a kind of problem could be that while we connect with a
- // peer RS, a DS is connecting at the same time and we could publish the
- // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
- //
- // This method and every others that need to read/make changes to the
- // structures holding topology for the domain should:
- // - call ReplicationServerDomain.lock()
- // - read/modify structures
- // - call ReplicationServerDomain.release()
- //
- // More information is provided in comment of ReplicationServerDomain.lock()
-
- // If domain already exists, lock it until handshake is finished otherwise
- // it will be created and locked later in the method
- if (baseDn != null)
+ // We did not recognize the message, close session as what
+ // can happen after is undetermined and we do not want the server to
+ // be disturbed
+ if (session!=null)
{
- ReplicationServerDomain rsd =
- replicationServer.getReplicationServerDomain(baseDn, false);
- if (rsd != null)
+ try
{
- try
- {
- rsd.lock();
- } catch (InterruptedException ex)
- {
- // Thread interrupted, return.
- return;
- }
+ session.publish(
+ new ErrorMsg(
+ replicationServerDomain.getReplicationServer().getServerId(),
+ serverId,
+ reason));
}
+ catch(Exception e)
+ {
+ }
+ closeSession(session, reason, this);
}
- long oldGenerationId = -100;
+ if ((replicationServerDomain != null) &&
+ replicationServerDomain.hasLock())
+ replicationServerDomain.release();
- if (debugEnabled())
- TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
- " starts a new LS or RS " +
- ((baseDn == null) ? "incoming connection" : "outgoing connection"));
-
- this.replicationServerId = replicationServerId;
- rcvWindowSizeHalf = windowSize / 2;
- maxRcvWindow = windowSize;
- rcvWindow = windowSize;
- long localGenerationId = -1;
- ReplServerStartMsg outReplServerStartMsg = null;
-
- /**
- * This boolean prevents from logging a polluting error when connection\
- * aborted from a DS that wanted only to perform handshake phase 1 in order
- * to determine the best suitable RS:
- * 1) -> ServerStartMsg
- * 2) <- ReplServerStartMsg
- * 3) connection closure
- */
- boolean log_error_message = true;
-
- try
+ // If generation id of domain was changed, set it back to old value
+ // We may have changed it as it was -1 and we received a value >0 from
+ // peer server and the last topo message sent may have failed being
+ // sent: in that case retrieve old value of generation id for
+ // replication server domain
+ if (oldGenerationId != -100)
{
- /*
- * PROCEDE WITH FIRST PHASE OF HANDSHAKE:
- * ServerStartMsg then ReplServerStartMsg (with a DS)
- * OR
- * ReplServerStartMsg then ReplServerStartMsg (with a RS)
- */
+ replicationServerDomain.setGenerationId(oldGenerationId, false);
+ }
+ }
- if (baseDn != null) // Outgoing connection
-
+ /**
+ * Check the protocol window and send WindowMsg if necessary.
+ *
+ * @throws IOException when the session becomes unavailable.
+ */
+ public synchronized void checkWindow() throws IOException
+ {
+ if (rcvWindow < rcvWindowSizeHalf)
+ {
+ if (flowControl)
{
- // This is an outgoing connection. Publish our start message.
- this.baseDn = baseDn;
-
- // Get or create the ReplicationServerDomain
- replicationServerDomain =
- replicationServer.getReplicationServerDomain(baseDn, true);
- if (!replicationServerDomain.hasLock())
+ if (replicationServerDomain.restartAfterSaturation(this))
{
- try
- {
- replicationServerDomain.lock();
- } catch (InterruptedException ex)
- {
- // Thread interrupted, return.
- return;
- }
+ flowControl = false;
}
- localGenerationId = replicationServerDomain.getGenerationId();
+ }
+ if (!flowControl)
+ {
+ WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
+ session.publish(msg);
+ rcvWindow += rcvWindowSizeHalf;
+ }
+ }
+ }
- ServerState localServerState =
- replicationServerDomain.getDbServerState();
- outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
- replicationServerURL,
- baseDn, windowSize, localServerState,
- protocolVersion, localGenerationId,
- sslEncryption,
- replicationServer.getGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
+ /**
+ * Decrement the protocol window, then check if it is necessary
+ * to send a WindowMsg and send it.
+ *
+ * @throws IOException when the session becomes unavailable.
+ */
+ public synchronized void decAndCheckWindow() throws IOException
+ {
+ rcvWindow--;
+ checkWindow();
+ }
- session.publish(outReplServerStartMsg);
+ /**
+ * Set the shut down flag to true and returns the previous value of the flag.
+ * @return The previous value of the shut down flag
+ */
+ public boolean engageShutdown()
+ {
+ // Use thread safe boolean
+ return shuttingDown.getAndSet(true);
+ }
+
+ /**
+ * Finalize the initialization, create reader, writer, heartbeat system
+ * and monitoring system.
+ * @throws DirectoryException When an exception is raised.
+ */
+ protected void finalizeStart()
+ throws DirectoryException
+ {
+ // FIXME:ECL We should refactor so that a SH always have a session
+ if (session != null)
+ {
+ try
+ {
+ // Disable timeout for next communications
+ session.setSoTimeout(0);
+ }
+ catch(Exception e)
+ {
}
- // Wait and process ServerStartMsg or ReplServerStartMsg
- ReplicationMsg msg = session.receive();
- if (msg instanceof ServerStartMsg)
- {
- // The remote server is an LDAP Server.
- ServerStartMsg serverStartMsg = (ServerStartMsg) msg;
-
- generationId = serverStartMsg.getGenerationId();
- protocolVersion = ProtocolVersion.minWithCurrent(
- serverStartMsg.getVersion());
- serverId = serverStartMsg.getServerId();
- serverURL = serverStartMsg.getServerURL();
- this.baseDn = serverStartMsg.getBaseDn();
- this.serverState = serverStartMsg.getServerState();
- this.groupId = serverStartMsg.getGroupId();
-
- maxReceiveDelay = serverStartMsg.getMaxReceiveDelay();
- maxReceiveQueue = serverStartMsg.getMaxReceiveQueue();
- maxSendDelay = serverStartMsg.getMaxSendDelay();
- maxSendQueue = serverStartMsg.getMaxSendQueue();
- heartbeatInterval = serverStartMsg.getHeartbeatInterval();
-
- // The session initiator decides whether to use SSL.
- sslEncryption = serverStartMsg.getSSLEncryption();
-
- if (maxReceiveQueue > 0)
- restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue -
- 200 : maxReceiveQueue * 8 / 10);
- else
- restartReceiveQueue = 0;
-
- if (maxSendQueue > 0)
- restartSendQueue =
- (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 /
- 10);
- else
- restartSendQueue = 0;
-
- if (maxReceiveDelay > 0)
- restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1
- : maxReceiveDelay);
- else
- restartReceiveDelay = 0;
-
- if (maxSendDelay > 0)
- restartSendDelay =
- (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay);
- else
- restartSendDelay = 0;
-
- if (heartbeatInterval < 0)
- {
- heartbeatInterval = 0;
- }
-
- serverIsLDAPserver = true;
-
- // Get or Create the ReplicationServerDomain
- replicationServerDomain =
- replicationServer.getReplicationServerDomain(this.baseDn, true);
-
- // Hack to be sure that if a server disconnects and reconnect, we
- // let the reader thread see the closure and cleanup any reference
- // to old connection
- replicationServerDomain.waitDisconnection(serverStartMsg.getServerId());
-
- if (!replicationServerDomain.hasLock())
- {
- try
- {
- replicationServerDomain.lock();
- } catch (InterruptedException ex)
- {
- // Thread interrupted, return.
- return;
- }
- }
-
- // Duplicate server ?
- if (!replicationServerDomain.checkForDuplicateDS(this))
- {
- closeSession(null);
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- return;
- }
-
- localGenerationId = replicationServerDomain.getGenerationId();
-
- ServerState localServerState =
- replicationServerDomain.getDbServerState();
- // This an incoming connection. Publish our start message
- ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(replicationServerId, replicationServerURL,
- this.baseDn, windowSize, localServerState,
- protocolVersion, localGenerationId,
- sslEncryption,
- replicationServer.getGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
- session.publish(replServerStartMsg);
- sendWindowSize = serverStartMsg.getWindowSize();
-
- /* Until here session is encrypted then it depends on the
- negotiation */
- if (!sslEncryption)
- {
- session.stopEncryption();
- }
-
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" +
- "\nSH HANDSHAKE RECEIVED:\n" + serverStartMsg.toString() +
- "\nAND REPLIED:\n" + replServerStartMsg.toString());
- }
- } else if (msg instanceof ReplServerStartMsg)
- {
- // The remote server is a replication server
- ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) msg;
- protocolVersion = ProtocolVersion.minWithCurrent(
- inReplServerStartMsg.getVersion());
- generationId = inReplServerStartMsg.getGenerationId();
- serverId = inReplServerStartMsg.getServerId();
- serverURL = inReplServerStartMsg.getServerURL();
- int separator = serverURL.lastIndexOf(':');
- serverAddressURL =
- session.getRemoteAddress() + ":" + serverURL.substring(separator +
- 1);
- serverIsLDAPserver = false;
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- // We support connection from a V1 RS
- // Only V2 protocol has the group id in repl server start message
- this.groupId = inReplServerStartMsg.getGroupId();
- }
- this.baseDn = inReplServerStartMsg.getBaseDn();
-
- if (baseDn == null) // Reply to incoming RS
-
- {
- // Get or create the ReplicationServerDomain
- replicationServerDomain =
- replicationServer.getReplicationServerDomain(this.baseDn, true);
- if (!replicationServerDomain.hasLock())
- {
- try
- {
- /**
- * Take the lock on the domain.
- * WARNING: Here we try to acquire the lock with a timeout. This
- * is for preventing a deadlock that may happen if there are cross
- * connection attempts (for same domain) from this replication
- * server and from a peer one:
- * Here is the scenario:
- * - RS1 connect thread takes the domain lock and starts
- * connection to RS2
- * - at the same time RS2 connect thread takes his domain lock and
- * start connection to RS2
- * - RS2 listen thread starts processing received
- * ReplServerStartMsg from RS1 and wants to acquire the lock on
- * the domain (here) but cannot as RS2 connect thread already has
- * it
- * - RS1 listen thread starts processing received
- * ReplServerStartMsg from RS2 and wants to acquire the lock on
- * the domain (here) but cannot as RS1 connect thread already has
- * it
- * => Deadlock: 4 threads are locked.
- * So to prevent that in such situation, the listen threads here
- * will both timeout trying to acquire the lock. The random time
- * for the timeout should allow on connection attempt to be
- * aborted whereas the other one should have time to finish in the
- * same time.
- * Warning: the minimum time (3s) should be big enough to allow
- * normal situation connections to terminate. The added random
- * time should represent a big enough range so that the chance to
- * have one listen thread timing out a lot before the peer one is
- * great. When the first listen thread times out, the remote
- * connect thread should release the lock and allow the peer
- * listen thread to take the lock it was waiting for and process
- * the connection attempt.
- */
- Random random = new Random();
- int randomTime = random.nextInt(6); // Random from 0 to 5
- // Wait at least 3 seconds + (0 to 5 seconds)
- long timeout = (long) (3000 + ( randomTime * 1000 ) );
- boolean noTimeout = replicationServerDomain.tryLock(timeout);
- if (!noTimeout)
- {
- // Timeout
- Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
- this.baseDn,
- Short.toString(serverId),
- Short.toString(replicationServer.getServerId()));
- closeSession(message);
- return;
- }
- } catch (InterruptedException ex)
- {
- // Thread interrupted, return.
- return;
- }
- }
- localGenerationId = replicationServerDomain.getGenerationId();
- ServerState domServerState =
- replicationServerDomain.getDbServerState();
-
- // The session initiator decides whether to use SSL.
- sslEncryption = inReplServerStartMsg.getSSLEncryption();
-
- // Publish our start message
- outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
- replicationServerURL,
- this.baseDn, windowSize, domServerState,
- protocolVersion,
- localGenerationId,
- sslEncryption,
- replicationServer.getGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- session.publish(outReplServerStartMsg);
- } else {
- // We support connection from a V1 RS, send PDU with V1 form
- session.publish(outReplServerStartMsg,
- ProtocolVersion.REPLICATION_PROTOCOL_V1);
- }
-
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" +
- "\nSH HANDSHAKE RECEIVED:\n" + inReplServerStartMsg.toString() +
- "\nAND REPLIED:\n" + outReplServerStartMsg.toString());
- }
- } else
- {
- // Did the remote RS answer with the DN we provided him ?
- if (!(this.baseDn.equals(baseDn)))
- {
- Message message = ERR_RS_DN_DOES_NOT_MATCH.get(
- this.baseDn.toString(),
- baseDn.toString());
- closeSession(message);
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- return;
- }
-
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" +
- "\nSH HANDSHAKE SENT:\n" + outReplServerStartMsg.toString() +
- "\nAND RECEIVED:\n" + inReplServerStartMsg.toString());
- }
- }
- this.serverState = inReplServerStartMsg.getServerState();
- sendWindowSize = inReplServerStartMsg.getWindowSize();
-
- // Duplicate server ?
- if (!replicationServerDomain.checkForDuplicateRS(this))
- {
- closeSession(null);
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- return;
- }
-
- /* Until here session is encrypted then it depends on the
- negociation */
- if (!sslEncryption)
- {
- session.stopEncryption();
- }
- } else
- {
- // We did not recognize the message, close session as what
- // can happen after is undetermined and we do not want the server to
- // be disturbed
- closeSession(null);
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- return;
- }
-
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
- { // Only protocol version above V1 has a phase 2 handshake
-
- /*
- * NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE:
- * TopologyMsg then TopologyMsg (with a RS)
- * OR
- * StartSessionMsg then TopologyMsg (with a DS)
- */
-
- TopologyMsg outTopoMsg = null;
-
- if (baseDn != null) // Outgoing connection to a RS
-
- {
- // Send our own TopologyMsg to remote RS
- outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
- session.publish(outTopoMsg);
- }
-
- // Wait and process TopologyMsg or StartSessionMsg
- log_error_message = false;
- ReplicationMsg msg2 = session.receive();
- log_error_message = true;
- if (msg2 instanceof TopologyMsg)
- {
- // Remote RS sent his topo msg
- TopologyMsg inTopoMsg = (TopologyMsg) msg2;
-
- // CONNECTION WITH A RS
-
- // if the remote RS and the local RS have the same genID
- // then it's ok and nothing else to do
- if (generationId == localGenerationId)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + " RS with serverID=" + serverId +
- " is connected with the right generation ID");
- }
- } else
- {
- if (localGenerationId > 0)
- {
- // if the local RS is initialized
- if (generationId > 0)
- {
- // if the remote RS is initialized
- if (generationId != localGenerationId)
- {
- // if the 2 RS have different generationID
- if (replicationServerDomain.getGenerationIdSavedStatus())
- {
- // if the present RS has received changes regarding its
- // gen ID and so won't change without a reset
- // then we are just degrading the peer.
- Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn,
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(localGenerationId));
- logError(message);
- } else
- {
- // The present RS has never received changes regarding its
- // gen ID.
- //
- // Example case:
- // - we are in RS1
- // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
- // - RS1 has genId1 from LS1 /genId1 comes from data in
- // suffix
- // - we are in RS1 and we receive a START msg from RS2
- // - Each RS keeps its genID / is degraded and when LS2
- // will be populated from LS1 everything will become ok.
- //
- // Issue:
- // FIXME : Would it be a good idea in some cases to just
- // set the gen ID received from the peer RS
- // specially if the peer has a non null state and
- // we have a nul state ?
- // replicationServerDomain.
- // setGenerationId(generationId, false);
- Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn,
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(localGenerationId));
- logError(message);
- }
- }
- } else
- {
- // The remote RS has no genId. We don't change anything for the
- // current RS.
- }
- } else
- {
- // The local RS is not initialized - take the one received
- // WARNING: Must be done before computing topo message to send
- // to peer server as topo message must embed valid generation id
- // for our server
- oldGenerationId =
- replicationServerDomain.setGenerationId(generationId, false);
- }
- }
-
- if (baseDn == null) // Reply to the RS (incoming connection)
-
- {
- // Send our own TopologyMsg to remote RS
- outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
- session.publish(outTopoMsg);
-
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" +
- "\nSH HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
- "\nAND REPLIED:\n" + outTopoMsg.toString());
- }
- } else
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" +
- "\nSH HANDSHAKE SENT:\n" + outTopoMsg.toString() +
- "\nAND RECEIVED:\n" + inTopoMsg.toString());
- }
- }
-
- // Alright, connected with new RS (either outgoing or incoming
- // connection): store handler.
- Map<Short, ServerHandler> connectedRSs =
- replicationServerDomain.getConnectedRSs();
- connectedRSs.put(serverId, this);
-
- // Process TopologyMsg sent by remote RS: store matching new info
- // (this will also warn our connected DSs of the new received info)
- replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
-
- } else if (msg2 instanceof StartSessionMsg)
- {
- // CONNECTION WITH A DS
-
- // Process StartSessionMsg sent by remote DS
- StartSessionMsg startSessionMsg = (StartSessionMsg) msg2;
-
- this.status = startSessionMsg.getStatus();
- // Sanity check: is it a valid initial status?
- if (!isValidInitialStatus(this.status))
- {
- Message mesg = ERR_RS_INVALID_INIT_STATUS.get(
- this.status.toString(), this.baseDn.toString(),
- Short.toString(serverId));
- closeSession(mesg);
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- return;
- }
- this.refUrls = startSessionMsg.getReferralsURLs();
- this.assuredFlag = startSessionMsg.isAssured();
- this.assuredMode = startSessionMsg.getAssuredMode();
- this.safeDataLevel = startSessionMsg.getSafeDataLevel();
-
- /*
- * If we have already a generationID set for the domain
- * then
- * if the connecting replica has not the same
- * then it is degraded locally and notified by an error message
- * else
- * we set the generationID from the one received
- * (unsaved yet on disk . will be set with the 1rst change
- * received)
- */
- if (localGenerationId > 0)
- {
- if (generationId != localGenerationId)
- {
- Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
- this.baseDn,
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(localGenerationId));
- logError(message);
- }
- } else
- {
- // We are an empty Replicationserver
- if ((generationId > 0) && (!serverState.isEmpty()))
- {
- // If the LDAP server has already sent changes
- // it is not expected to connect to an empty RS
- Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
- this.baseDn,
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(localGenerationId));
- logError(message);
- } else
- {
- // The local RS is not initialized - take the one received
- // WARNING: Must be done before computing topo message to send
- // to peer server as topo message must embed valid generation id
- // for our server
- oldGenerationId =
- replicationServerDomain.setGenerationId(generationId, false);
- }
- }
-
- // Send our own TopologyMsg to DS
- outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
- this.serverId);
- session.publish(outTopoMsg);
-
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" +
- "\nSH HANDSHAKE RECEIVED:\n" + startSessionMsg.toString() +
- "\nAND REPLIED:\n" + outTopoMsg.toString());
- }
-
- // Alright, connected with new DS: store handler.
- Map<Short, ServerHandler> connectedDSs =
- replicationServerDomain.getConnectedDSs();
- connectedDSs.put(serverId, this);
-
- // Tell peer DSs a new DS just connected to us
- // No need to resend topo msg to this just new DS so not null
- // argument
- replicationServerDomain.sendTopoInfoToDSs(this);
- // Tell peer RSs a new DS just connected to us
- replicationServerDomain.sendTopoInfoToRSs();
- } else
- {
- // We did not recognize the message, close session as what
- // can happen after is undetermined and we do not want the server to
- // be disturbed
- closeSession(null);
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- return;
- }
- } else
- {
- // Terminate connection from a V1 RS
-
- // if the remote RS and the local RS have the same genID
- // then it's ok and nothing else to do
- if (generationId == localGenerationId)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + " RS V1 with serverID=" + serverId +
- " is connected with the right generation ID");
- }
- } else
- {
- if (localGenerationId > 0)
- {
- // if the local RS is initialized
- if (generationId > 0)
- {
- // if the remote RS is initialized
- if (generationId != localGenerationId)
- {
- // if the 2 RS have different generationID
- if (replicationServerDomain.getGenerationIdSavedStatus())
- {
- // if the present RS has received changes regarding its
- // gen ID and so won't change without a reset
- // then we are just degrading the peer.
- Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn,
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(localGenerationId));
- logError(message);
- } else
- {
- // The present RS has never received changes regarding its
- // gen ID.
- //
- // Example case:
- // - we are in RS1
- // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
- // - RS1 has genId1 from LS1 /genId1 comes from data in
- // suffix
- // - we are in RS1 and we receive a START msg from RS2
- // - Each RS keeps its genID / is degraded and when LS2
- // will be populated from LS1 everything will become ok.
- //
- // Issue:
- // FIXME : Would it be a good idea in some cases to just
- // set the gen ID received from the peer RS
- // specially if the peer has a non null state and
- // we have a nul state ?
- // replicationServerDomain.
- // setGenerationId(generationId, false);
- Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn,
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(localGenerationId));
- logError(message);
- }
- }
- } else
- {
- // The remote RS has no genId. We don't change anything for the
- // current RS.
- }
- } else
- {
- // The local RS is not initialized - take the one received
- oldGenerationId =
- replicationServerDomain.setGenerationId(generationId, false);
- }
- }
-
- // Alright, connected with new incoming V1 RS: store handler.
- Map<Short, ServerHandler> connectedRSs =
- replicationServerDomain.getConnectedRSs();
- connectedRSs.put(serverId, this);
-
- // Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1
- // all the servers of the topology. We prefer not not send a TopologyMsg
- // for giving partial/false information to the V2 servers as for
- // instance we don't have the connected DS of the V1 RS...When the V1
- // RS will be upgraded in his turn, topo info will be sent and accurate.
- // That way, there is no risk to have false/incomplete information in
- // other servers.
- }
-
- /*
- * FINALIZE INITIALIZATION:
- * CREATE READER AND WRITER, HEARTBEAT SYSTEM AND UPDATE MONITORING
- * SYSTEM
- */
-
- // Disable timeout for next communications
- session.setSoTimeout(0);
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
writer = new ServerWriter(session, serverId,
- this, replicationServerDomain);
+ this, replicationServerDomain);
reader = new ServerReader(session, serverId,
- this, replicationServerDomain);
+ this, replicationServerDomain);
reader.start();
writer.start();
@@ -1056,440 +388,30 @@
if (heartbeatInterval > 0)
{
heartbeatThread = new HeartbeatThread(
- "Replication Heartbeat to DS " + serverURL + " " + serverId +
- " for " + this.baseDn + " in RS " + replicationServerId,
- session, heartbeatInterval / 3);
+ "Replication Heartbeat to " + this +
+ " in RS " + replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName(),
+ session, heartbeatInterval / 3);
heartbeatThread.start();
}
-
- // Create the status analyzer for the domain if not already started
- if (serverIsLDAPserver)
- {
- if (!replicationServerDomain.isRunningStatusAnalyzer())
- {
- if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.
- getReplicationServer().
- getMonitorInstanceName() +
- " SH for remote server " + this.getMonitorInstanceName() +
- " is starting status analyzer");
- replicationServerDomain.startStatusAnalyzer();
- }
- }
-
- DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
- DirectoryServer.registerMonitorProvider(this);
- } catch (NotSupportedOldVersionPDUException e)
- {
- // We do not need to support DS V1 connection, we just accept RS V1
- // connection:
- // We just trash the message, log the event for debug purpose and close
- // the connection
- if (debugEnabled())
- TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ":"
- + e.getMessage());
- closeSession(null);
- } catch (Exception e)
- {
- // We do not want polluting error log if error is due to normal session
- // aborted after handshake phase one from a DS that is searching for best
- // suitable RS.
- if ( log_error_message || (baseDn != null) )
- {
- // some problem happened, reject the connection
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_REPLICATION_SERVER_CONNECTION_ERROR.get(
- this.getMonitorInstanceName()));
- mb.append(": " + stackTraceToSingleLineString(e));
- closeSession(mb.toMessage());
- } else
- {
- closeSession(null);
- }
-
- // If generation id of domain was changed, set it back to old value
- // We may have changed it as it was -1 and we received a value >0 from
- // peer server and the last topo message sent may have failed being
- // sent: in that case retrieve old value of generation id for
- // replication server domain
- if (oldGenerationId != -100)
- {
- replicationServerDomain.setGenerationId(oldGenerationId, false);
- }
}
- // Release domain
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
- replicationServerDomain.release();
- }
-
- /*
- * Close the session logging the passed error message
- * Log nothing if message is null.
- */
- private void closeSession(Message msg)
- {
- if (msg != null)
- {
- logError(msg);
- }
- try
- {
- session.close();
- } catch (IOException ee)
- {
- // ignore
- }
+ DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+ DirectoryServer.registerMonitorProvider(this);
}
/**
- * get the Server Id.
+ * Sends a message containing a generationId to a peer server.
+ * The peer is expected to be a replication server.
*
- * @return the ID of the server to which this object is linked
- */
- public short getServerId()
- {
- return serverId;
- }
-
- /**
- * Retrieves the Address URL for this server handler.
+ * @param msg The GenerationIdMessage message to be sent.
+ * @throws IOException When it occurs while sending the message,
*
- * @return The Address URL for this server handler,
- * in the form of an IP address and port separated by a colon.
*/
- public String getServerAddressURL()
+ public void forwardGenerationIdToRS(ResetGenerationIdMsg msg)
+ throws IOException
{
- return serverAddressURL;
- }
-
- /**
- * Retrieves the URL for this server handler.
- *
- * @return The URL for this server handler, in the form of an address and
- * port separated by a colon.
- */
- public String getServerURL()
- {
- return serverURL;
- }
-
- /**
- * Increase the counter of updates sent to the server.
- */
- public void incrementOutCount()
- {
- outCount++;
- }
-
- /**
- * Increase the counter of update received from the server.
- */
- public void incrementInCount()
- {
- inCount++;
- }
-
- /**
- * Get the count of updates received from the server.
- * @return the count of update received from the server.
- */
- public int getInCount()
- {
- return inCount;
- }
-
- /**
- * Get the count of updates sent to this server.
- * @return The count of update sent to this server.
- */
- public int getOutCount()
- {
- return outCount;
- }
-
- /**
- * Get the number of updates received from the server in assured safe read
- * mode.
- * @return The number of updates received from the server in assured safe read
- * mode
- */
- public int getAssuredSrReceivedUpdates()
- {
- return assuredSrReceivedUpdates;
- }
-
- /**
- * Get the number of updates received from the server in assured safe read
- * mode that timed out.
- * @return The number of updates received from the server in assured safe read
- * mode that timed out.
- */
- public AtomicInteger getAssuredSrReceivedUpdatesTimeout()
- {
- return assuredSrReceivedUpdatesTimeout;
- }
-
- /**
- * Get the number of updates sent to the server in assured safe read mode.
- * @return The number of updates sent to the server in assured safe read mode
- */
- public int getAssuredSrSentUpdates()
- {
- return assuredSrSentUpdates;
- }
-
- /**
- * Get the number of updates sent to the server in assured safe read mode that
- * timed out.
- * @return The number of updates sent to the server in assured safe read mode
- * that timed out.
- */
- public AtomicInteger getAssuredSrSentUpdatesTimeout()
- {
- return assuredSrSentUpdatesTimeout;
- }
-
- /**
- * Get the number of updates received from the server in assured safe data
- * mode.
- * @return The number of updates received from the server in assured safe data
- * mode
- */
- public int getAssuredSdReceivedUpdates()
- {
- return assuredSdReceivedUpdates;
- }
-
- /**
- * Get the number of updates received from the server in assured safe data
- * mode that timed out.
- * @return The number of updates received from the server in assured safe data
- * mode that timed out.
- */
- public AtomicInteger getAssuredSdReceivedUpdatesTimeout()
- {
- return assuredSdReceivedUpdatesTimeout;
- }
-
- /**
- * Get the number of updates sent to the server in assured safe data mode.
- * @return The number of updates sent to the server in assured safe data mode
- */
- public int getAssuredSdSentUpdates()
- {
- return assuredSdSentUpdates;
- }
-
- /**
- * Get the number of updates sent to the server in assured safe data mode that
- * timed out.
- * @return The number of updates sent to the server in assured safe data mode
- * that timed out.
- */
- public AtomicInteger getAssuredSdSentUpdatesTimeout()
- {
- return assuredSdSentUpdatesTimeout;
- }
-
- /**
- * Increment the number of updates received from the server in assured safe
- * read mode.
- */
- public void incrementAssuredSrReceivedUpdates()
- {
- assuredSrReceivedUpdates++;
- }
-
- /**
- * Increment the number of updates received from the server in assured safe
- * read mode that timed out.
- */
- public void incrementAssuredSrReceivedUpdatesTimeout()
- {
- assuredSrReceivedUpdatesTimeout.incrementAndGet();
- }
-
- /**
- * Increment the number of updates sent to the server in assured safe read
- * mode.
- */
- public void incrementAssuredSrSentUpdates()
- {
- assuredSrSentUpdates++;
- }
-
- /**
- * Increment the number of updates sent to the server in assured safe read
- * mode that timed out.
- */
- public void incrementAssuredSrSentUpdatesTimeout()
- {
- assuredSrSentUpdatesTimeout.incrementAndGet();
- }
-
- /**
- * Increment the number of updates received from the server in assured safe
- * data mode.
- */
- public void incrementAssuredSdReceivedUpdates()
- {
- assuredSdReceivedUpdates++;
- }
-
- /**
- * Increment the number of updates received from the server in assured safe
- * data mode that timed out.
- */
- public void incrementAssuredSdReceivedUpdatesTimeout()
- {
- assuredSdReceivedUpdatesTimeout.incrementAndGet();
- }
-
- /**
- * Increment the number of updates sent to the server in assured safe data
- * mode.
- */
- public void incrementAssuredSdSentUpdates()
- {
- assuredSdSentUpdates++;
- }
-
- /**
- * Increment the number of updates sent to the server in assured safe data
- * mode that timed out.
- */
- public void incrementAssuredSdSentUpdatesTimeout()
- {
- assuredSdSentUpdatesTimeout.incrementAndGet();
- }
-
- /**
- * Check is this server is saturated (this server has already been
- * sent a bunch of updates and has not processed them so they are staying
- * in the message queue for this server an the size of the queue
- * for this server is above the configured limit.
- *
- * The limit can be defined in number of updates or with a maximum delay
- *
- * @param changeNumber The changenumber to use to make the delay calculations.
- * @param sourceHandler The ServerHandler which is sending the update.
- * @return true is saturated false if not saturated.
- */
- public boolean isSaturated(ChangeNumber changeNumber,
- ServerHandler sourceHandler)
- {
- synchronized (msgQueue)
- {
- int size = msgQueue.count();
-
- if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
- return true;
-
- if ((sourceHandler.maxSendQueue > 0) &&
- (size >= sourceHandler.maxSendQueue))
- return true;
-
- if (!msgQueue.isEmpty())
- {
- UpdateMsg firstUpdate = msgQueue.first();
-
- if (firstUpdate != null)
- {
- long timeDiff = changeNumber.getTimeSec() -
- firstUpdate.getChangeNumber().getTimeSec();
-
- if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
- return true;
-
- if ((sourceHandler.maxSendDelay > 0) &&
- (timeDiff >= sourceHandler.maxSendDelay))
- return true;
- }
- }
- return false;
- }
- }
-
- /**
- * Check that the size of the Server Handler messages Queue has lowered
- * below the limit and therefore allowing the reception of messages
- * from other servers to restart.
- * @param source The ServerHandler which was sending the update.
- * can be null.
- * @return true if the processing can restart
- */
- public boolean restartAfterSaturation(ServerHandler source)
- {
- synchronized (msgQueue)
- {
- int queueSize = msgQueue.count();
- if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
- return false;
- if ((source != null) && (source.maxSendQueue > 0) &&
- (queueSize >= source.restartSendQueue))
- return false;
-
- if (!msgQueue.isEmpty())
- {
- UpdateMsg firstUpdate = msgQueue.first();
- UpdateMsg lastUpdate = msgQueue.last();
-
- if ((firstUpdate != null) && (lastUpdate != null))
- {
- long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
- firstUpdate.getChangeNumber().getTimeSec();
- if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
- return false;
- if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >=
- source.restartSendDelay))
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Check if the server associated to this ServerHandler is a replication
- * server.
- * @return true if the server associated to this ServerHandler is a
- * replication server.
- */
- public boolean isReplicationServer()
- {
- return (!serverIsLDAPserver);
- }
-
- /**
- * Get the number of message in the receive message queue.
- * @return Size of the receive message queue.
- */
- public int getRcvMsgQueueSize()
- {
- synchronized (msgQueue)
- {
- /*
- * When the server is up to date or close to be up to date,
- * the number of updates to be sent is the size of the receive queue.
- */
- if (isFollowing())
- return msgQueue.count();
- else
- {
- /**
- * When the server is not able to follow, the msgQueue
- * may become too large and therefore won't contain all the
- * changes. Some changes may only be stored in the backing DB
- * of the servers.
- * The total size of the receive queue is calculated by doing
- * the sum of the number of missing changes for every dbHandler.
- */
- ServerState dbState = replicationServerDomain.getDbServerState();
- return ServerState.diffChanges(dbState, serverState);
- }
- }
+ session.publish(msg);
}
/**
@@ -1536,478 +458,133 @@
}
/**
- * Get the older update time for that server.
- * @return The older update time.
+ * Get the number of updates received from the server in assured safe data
+ * mode.
+ * @return The number of updates received from the server in assured safe data
+ * mode
*/
- public long getOlderUpdateTime()
+ public int getAssuredSdReceivedUpdates()
{
- ChangeNumber olderUpdateCN = getOlderUpdateCN();
- if (olderUpdateCN == null)
- return 0;
- return olderUpdateCN.getTime();
+ return assuredSdReceivedUpdates;
}
/**
- * Get the older Change Number for that server.
- * Returns null when the queue is empty.
- * @return The older change number.
+ * Get the number of updates received from the server in assured safe data
+ * mode that timed out.
+ * @return The number of updates received from the server in assured safe data
+ * mode that timed out.
*/
- public ChangeNumber getOlderUpdateCN()
+ public AtomicInteger getAssuredSdReceivedUpdatesTimeout()
{
- ChangeNumber result = null;
- synchronized (msgQueue)
- {
- if (isFollowing())
- {
- if (msgQueue.isEmpty())
- {
- result = null;
- } else
- {
- UpdateMsg msg = msgQueue.first();
- result = msg.getChangeNumber();
- }
- } else
- {
- if (lateQueue.isEmpty())
- {
- // isFollowing is false AND lateQueue is empty
- // We may be at the very moment when the writer has emptyed the
- // lateQueue when it sent the last update. The writer will fill again
- // the lateQueue when it will send the next update but we are not yet
- // there. So let's take the last change not sent directly from
- // the db.
-
- ReplicationIteratorComparator comparator =
- new ReplicationIteratorComparator();
- SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(comparator);
- try
- {
- // Build a list of candidates iterator (i.e. db i.e. server)
- for (short serverId : replicationServerDomain.getServers())
- {
- // get the last already sent CN from that server
- ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
- // get an iterator in this server db from that last change
- ReplicationIterator iterator =
- replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- // if that iterator has changes, then it is a candidate
- // it is added in the sorted list at a position given by its
- // current change (see ReplicationIteratorComparator).
- if ((iterator != null) && (iterator.getChange() != null))
- {
- iteratorSortedSet.add(iterator);
- }
- }
- UpdateMsg msg = iteratorSortedSet.first().getChange();
- result = msg.getChangeNumber();
- } catch (Exception e)
- {
- result = null;
- } finally
- {
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
- }
- } else
- {
- UpdateMsg msg = lateQueue.first();
- result = msg.getChangeNumber();
- }
- }
- }
- return result;
+ return assuredSdReceivedUpdatesTimeout;
}
/**
- * Check if the LDAP server can follow the speed of the other servers.
- * @return true when the server has all the not yet sent changes
- * in its queue.
+ * Get the number of updates sent to the server in assured safe data mode.
+ * @return The number of updates sent to the server in assured safe data mode
*/
- public boolean isFollowing()
+ public int getAssuredSdSentUpdates()
{
- return following;
+ return assuredSdSentUpdates;
}
/**
- * Set the following flag of this server.
- * @param following the value that should be set.
+ * Get the number of updates sent to the server in assured safe data mode that
+ * timed out.
+ * @return The number of updates sent to the server in assured safe data mode
+ * that timed out.
*/
- public void setFollowing(boolean following)
+ public AtomicInteger getAssuredSdSentUpdatesTimeout()
{
- this.following = following;
+ return assuredSdSentUpdatesTimeout;
}
/**
- * Add an update to the list of updates that must be sent to the server
- * managed by this ServerHandler.
+ * Get the number of updates received from the server in assured safe read
+ * mode.
+ * @return The number of updates received from the server in assured safe read
+ * mode
+ */
+ public int getAssuredSrReceivedUpdates()
+ {
+ return assuredSrReceivedUpdates;
+ }
+
+ /**
+ * Get the number of updates received from the server in assured safe read
+ * mode that timed out.
+ * @return The number of updates received from the server in assured safe read
+ * mode that timed out.
+ */
+ public AtomicInteger getAssuredSrReceivedUpdatesTimeout()
+ {
+ return assuredSrReceivedUpdatesTimeout;
+ }
+
+ /**
+ * Get the number of updates sent to the server in assured safe read mode.
+ * @return The number of updates sent to the server in assured safe read mode
+ */
+ public int getAssuredSrSentUpdates()
+ {
+ return assuredSrSentUpdates;
+ }
+
+ /**
+ * Get the number of updates sent to the server in assured safe read mode that
+ * timed out.
+ * @return The number of updates sent to the server in assured safe read mode
+ * that timed out.
+ */
+ public AtomicInteger getAssuredSrSentUpdatesTimeout()
+ {
+ return assuredSrSentUpdatesTimeout;
+ }
+
+ /**
+ * Returns the Replication Server Domain to which belongs this server handler.
*
- * @param update The update that must be added to the list of updates.
- * @param sourceHandler The server that sent the update.
+ * @return The replication server domain.
*/
- public void add(UpdateMsg update, ServerHandler sourceHandler)
+ public ReplicationServerDomain getDomain()
{
- synchronized (msgQueue)
- {
- /*
- * If queue was empty the writer thread was probably asleep
- * waiting for some changes, wake it up
- */
- if (msgQueue.isEmpty())
- msgQueue.notify();
-
- msgQueue.add(update);
-
- /* TODO : size should be configurable
- * and larger than max-receive-queue-size
- */
- while ((msgQueue.count() > maxQueueSize) ||
- (msgQueue.bytesCount() > maxQueueBytesSize))
- {
- setFollowing(false);
- msgQueue.removeFirst();
- }
- }
-
- if (isSaturated(update.getChangeNumber(), sourceHandler))
- {
- sourceHandler.setSaturated(true);
- }
-
- }
-
- private void setSaturated(boolean value)
- {
- flowControl = value;
+ return this.replicationServerDomain;
}
/**
- * Select the next update that must be sent to the server managed by this
- * ServerHandler.
- *
- * @return the next update that must be sent to the server managed by this
- * ServerHandler.
+ * Returns the value of generationId for that handler.
+ * @return The value of the generationId.
*/
- public UpdateMsg take()
+ public long getGenerationId()
{
- boolean interrupted = true;
- UpdateMsg msg = getnextMessage();
-
- /*
- * When we remove a message from the queue we need to check if another
- * server is waiting in flow control because this queue was too long.
- * This check might cause a performance penalty an therefore it
- * is not done for every message removed but only every few messages.
- */
- if (++saturationCount > 10)
- {
- saturationCount = 0;
- try
- {
- replicationServerDomain.checkAllSaturation();
- } catch (IOException e)
- {
- }
- }
- boolean acquired = false;
- do
- {
- try
- {
- acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS);
- interrupted = false;
- } catch (InterruptedException e)
- {
- // loop until not interrupted
- }
- } while (((interrupted) || (!acquired)) && (!shutdownWriter));
- if (msg != null)
- {
- incrementOutCount();
- if (msg.isAssured())
- {
- if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
- {
- incrementAssuredSrSentUpdates();
- } else
- {
- if (!isLDAPserver())
- incrementAssuredSdSentUpdates();
- }
- }
- }
- return msg;
+ return generationId;
}
/**
- * Get the next update that must be sent to the server
- * from the message queue or from the database.
- *
- * @return The next update that must be sent to the server.
+ * Gets the group id of the server represented by this object.
+ * @return The group id of the server represented by this object.
*/
- private UpdateMsg getnextMessage()
+ public byte getGroupId()
{
- UpdateMsg msg;
- while (activeWriter == true)
- {
- if (following == false)
- {
- /* this server is late with regard to some other masters
- * in the topology or just joined the topology.
- * In such cases, we can't keep all changes in the queue
- * without saturating the memory, we therefore use
- * a lateQueue that is filled with a few changes from the changelogDB
- * If this server is able to close the gap, it will start using again
- * the regular msgQueue later.
- */
- if (lateQueue.isEmpty())
- {
- /*
- * Start from the server State
- * Loop until the queue high mark or until no more changes
- * for each known LDAP master
- * get the next CSN after this last one :
- * - try to get next from the file
- * - if not found in the file
- * - try to get the next from the queue
- * select the smallest of changes
- * check if it is in the memory tree
- * yes : lock memory tree.
- * check all changes from the list, remove the ones that
- * are already sent
- * unlock memory tree
- * restart as usual
- * load this change on the delayList
- *
- */
- ReplicationIteratorComparator comparator =
- new ReplicationIteratorComparator();
- SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(comparator);
- /* fill the lateQueue */
- for (short serverId : replicationServerDomain.getServers())
- {
- ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
- ReplicationIterator iterator =
- replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- if (iterator != null)
- {
- if (iterator.getChange() != null)
- {
- iteratorSortedSet.add(iterator);
- } else
- {
- iterator.releaseCursor();
- }
- }
- }
-
- // The loop below relies on the fact that it is sorted based
- // on the currentChange of each iterator to consider the next
- // change across all servers.
- // Hence it is necessary to remove and eventual add again an iterator
- // when looping in order to keep consistent the order of the
- // iterators (see ReplicationIteratorComparator.
- while (!iteratorSortedSet.isEmpty() &&
- (lateQueue.count()<100) &&
- (lateQueue.bytesCount()<50000) )
- {
- ReplicationIterator iterator = iteratorSortedSet.first();
- iteratorSortedSet.remove(iterator);
- lateQueue.add(iterator.getChange());
- if (iterator.next())
- iteratorSortedSet.add(iterator);
- else
- iterator.releaseCursor();
- }
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
- /*
- * Check if the first change in the lateQueue is also on the regular
- * queue
- */
- if (lateQueue.isEmpty())
- {
- synchronized (msgQueue)
- {
- if ((msgQueue.count() < maxQueueSize) &&
- (msgQueue.bytesCount() < maxQueueBytesSize))
- {
- setFollowing(true);
- }
- }
- } else
- {
- msg = lateQueue.first();
- synchronized (msgQueue)
- {
- if (msgQueue.contains(msg))
- {
- /* we finally catch up with the regular queue */
- setFollowing(true);
- lateQueue.clear();
- UpdateMsg msg1;
- do
- {
- msg1 = msgQueue.removeFirst();
- } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
- this.updateServerState(msg);
- return msg;
- }
- }
- }
- } else
- {
- /* get the next change from the lateQueue */
- msg = lateQueue.removeFirst();
- this.updateServerState(msg);
- return msg;
- }
- }
- synchronized (msgQueue)
- {
- if (following == true)
- {
- try
- {
- while (msgQueue.isEmpty() && (following == true))
- {
- msgQueue.wait(500);
- if (!activeWriter)
- return null;
- }
- } catch (InterruptedException e)
- {
- return null;
- }
- if (following == true)
- {
- msg = msgQueue.removeFirst();
- if (this.updateServerState(msg))
- {
- /*
- * Only push the message if it has not yet been seen
- * by the other server.
- * Otherwise just loop to select the next message.
- */
- return msg;
- }
- }
- }
- }
- /*
- * Need to loop because following flag may have gone to false between
- * the first check at the beginning of this method
- * and the second check just above.
- */
- }
- return null;
+ return groupId;
}
/**
- * Update the serverState with the last message sent.
- *
- * @param msg the last update sent.
- * @return boolean indicating if the update was meaningful.
+ * Get our heartbeat interval.
+ * @return Our heartbeat interval.
*/
- public boolean updateServerState(UpdateMsg msg)
+ public long getHeartbeatInterval()
{
- return serverState.update(msg.getChangeNumber());
+ return heartbeatInterval;
}
/**
- * Get the state of this server.
- *
- * @return ServerState the state for this server..
+ * Get the count of updates received from the server.
+ * @return the count of update received from the server.
*/
- public ServerState getServerState()
+ public int getInCount()
{
- return serverState;
- }
-
- /**
- * Sends an ack message to the server represented by this object.
- *
- * @param ack The ack message to be sent.
- * @throws IOException In case of Exception thrown sending the ack.
- */
- public void sendAck(AckMsg ack) throws IOException
- {
- session.publish(ack);
- }
-
- /**
- * Check type of server handled.
- *
- * @return true if the handled server is an LDAP server.
- * false if the handled server is a replicationServer
- */
- public boolean isLDAPserver()
- {
- return serverIsLDAPserver;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void initializeMonitorProvider(MonitorProviderCfg configuration)
- throws ConfigException, InitializationException
- {
- // Nothing to do for now
- }
-
- /**
- * Retrieves the name of this monitor provider. It should be unique among all
- * monitor providers, including all instances of the same monitor provider.
- *
- * @return The name of this monitor provider.
- */
- @Override
- public String getMonitorInstanceName()
- {
- String str = serverURL + " " + String.valueOf(serverId);
-
- if (serverIsLDAPserver)
- return "Connected Replica " + str +
- ",cn=" + replicationServerDomain.getMonitorInstanceName();
- else
- return "Connected Replication Server " + str +
- ",cn=" + replicationServerDomain.getMonitorInstanceName();
- }
-
- /**
- * Retrieves the length of time in milliseconds that should elapse between
- * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero
- * return value indicates that the <CODE>updateMonitorData()</CODE> method
- * should not be periodically invoked.
- *
- * @return The length of time in milliseconds that should elapse between
- * calls to the <CODE>updateMonitorData()</CODE> method.
- */
- @Override
- public long getUpdateInterval()
- {
- /* we don't wont to do polling on this monitor */
- return 0;
- }
-
- /**
- * Performs any processing periodic processing that may be desired to update
- * the information associated with this monitor. Note that best-effort
- * attempts will be made to ensure that calls to this method come
- * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
- * be made.
- */
- @Override
- public void updateMonitorData()
- {
- // As long as getUpdateInterval() returns 0, this will never get called
+ return inCount;
}
/**
@@ -2021,101 +598,11 @@
@Override
public ArrayList<Attribute> getMonitorData()
{
- ArrayList<Attribute> attributes = new ArrayList<Attribute>();
- if (serverIsLDAPserver)
- {
- attributes.add(Attributes.create("replica", serverURL));
- attributes.add(Attributes.create("connected-to",
- this.replicationServerDomain.getReplicationServer()
- .getMonitorInstanceName()));
+ // Get the generic ones
+ ArrayList<Attribute> attributes = super.getMonitorData();
- }
- else
- {
- attributes.add(Attributes.create("Replication-Server",
- serverURL));
- }
- attributes.add(Attributes.create("server-id", String
- .valueOf(serverId)));
- attributes.add(Attributes.create("domain-name", baseDn.toString()));
-
- try
- {
- MonitorData md;
- md = replicationServerDomain.computeMonitorData();
-
- if (serverIsLDAPserver)
- {
- // Oldest missing update
- Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
- if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
- {
- Date date = new Date(approxFirstMissingDate);
- attributes.add(Attributes.create(
- "approx-older-change-not-synchronized", date.toString()));
- attributes.add(Attributes.create(
- "approx-older-change-not-synchronized-millis", String
- .valueOf(approxFirstMissingDate)));
- }
-
- // Missing changes
- long missingChanges = md.getMissingChanges(serverId);
- attributes.add(Attributes.create("missing-changes", String
- .valueOf(missingChanges)));
-
- // Replication delay
- long delay = md.getApproxDelay(serverId);
- attributes.add(Attributes.create("approximate-delay", String
- .valueOf(delay)));
-
- /* get the Server State */
- AttributeBuilder builder = new AttributeBuilder("server-state");
- ServerState state = md.getLDAPServerState(serverId);
- if (state != null)
- {
- for (String str : state.toStringSet())
- {
- builder.add(str);
- }
- attributes.add(builder.toAttribute());
- }
- }
- else
- {
- // Missing changes
- long missingChanges = md.getMissingChangesRS(serverId);
- attributes.add(Attributes.create("missing-changes", String
- .valueOf(missingChanges)));
-
- /* get the Server State */
- AttributeBuilder builder = new AttributeBuilder("server-state");
- ServerState state = md.getRSStates(serverId);
- if (state != null)
- {
- for (String str : state.toStringSet())
- {
- builder.add(str);
- }
- attributes.add(builder.toAttribute());
- }
- }
- }
- catch (Exception e)
- {
- Message message =
- ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e));
- // We failed retrieving the monitor data.
- attributes.add(Attributes.create("error", message.toString()));
- }
-
- attributes.add(
- Attributes.create("queue-size", String.valueOf(msgQueue.count())));
- attributes.add(
- Attributes.create(
- "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
- attributes.add(
- Attributes.create(
- "following", String.valueOf(following)));
+ attributes.add(Attributes.create("server-id", String.valueOf(serverId)));
+ attributes.add(Attributes.create("domain-name", getServiceId().toString()));
// Deprecated
attributes.add(Attributes.create("max-waiting-changes", String
@@ -2129,23 +616,23 @@
attributes.add(Attributes.create("assured-sr-received-updates", String
.valueOf(getAssuredSrReceivedUpdates())));
attributes.add(Attributes.create("assured-sr-received-updates-timeout",
- String .valueOf(getAssuredSrReceivedUpdatesTimeout())));
+ String .valueOf(getAssuredSrReceivedUpdatesTimeout())));
attributes.add(Attributes.create("assured-sr-sent-updates", String
.valueOf(getAssuredSrSentUpdates())));
attributes.add(Attributes.create("assured-sr-sent-updates-timeout", String
.valueOf(getAssuredSrSentUpdatesTimeout())));
attributes.add(Attributes.create("assured-sd-received-updates", String
.valueOf(getAssuredSdReceivedUpdates())));
- if (!isLDAPserver())
+ if (!isDataServer())
{
attributes.add(Attributes.create("assured-sd-sent-updates",
- String.valueOf(getAssuredSdSentUpdates())));
+ String.valueOf(getAssuredSdSentUpdates())));
attributes.add(Attributes.create("assured-sd-sent-updates-timeout",
- String.valueOf(getAssuredSdSentUpdatesTimeout())));
+ String.valueOf(getAssuredSdSentUpdatesTimeout())));
} else
{
attributes.add(Attributes.create("assured-sd-received-updates-timeout",
- String.valueOf(getAssuredSdReceivedUpdatesTimeout())));
+ String.valueOf(getAssuredSdReceivedUpdatesTimeout())));
}
// Window stats
@@ -2170,44 +657,484 @@
}
/**
+ * Retrieves the name of this monitor provider. It should be unique among all
+ * monitor providers, including all instances of the same monitor provider.
+ *
+ * @return The name of this monitor provider.
+ */
+ @Override
+ public abstract String getMonitorInstanceName();
+
+ /**
+ * Get the older update time for that server.
+ * @return The older update time.
+ */
+ public long getOlderUpdateTime()
+ {
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN == null)
+ return 0;
+ return olderUpdateCN.getTime();
+ }
+
+ /**
+ * Get the count of updates sent to this server.
+ * @return The count of update sent to this server.
+ */
+ public int getOutCount()
+ {
+ return outCount;
+ }
+
+ /**
+ * Gets the protocol version used with this remote server.
+ * @return The protocol version used with this remote server.
+ */
+ public short getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ /**
+ * get the Server Id.
+ *
+ * @return the ID of the server to which this object is linked
+ */
+ public short getServerId()
+ {
+ return serverId;
+ }
+
+ /**
+ * Retrieves the URL for this server handler.
+ *
+ * @return The URL for this server handler, in the form of an address and
+ * port separated by a colon.
+ */
+ public String getServerURL()
+ {
+ return serverURL;
+ }
+
+ /**
+ * Return the ServerStatus.
+ * @return The server status.
+ */
+ protected abstract ServerStatus getStatus();
+
+ /**
+ * Retrieves the length of time in milliseconds that should elapse between
+ * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero
+ * return value indicates that the <CODE>updateMonitorData()</CODE> method
+ * should not be periodically invoked.
+ *
+ * @return The length of time in milliseconds that should elapse between
+ * calls to the <CODE>updateMonitorData()</CODE> method.
+ */
+ @Override
+ public long getUpdateInterval()
+ {
+ /* we don't wont to do polling on this monitor */
+ return 0;
+ }
+
+ /**
+ * Increment the number of updates received from the server in assured safe
+ * data mode.
+ */
+ public void incrementAssuredSdReceivedUpdates()
+ {
+ assuredSdReceivedUpdates++;
+ }
+
+ /**
+ * Increment the number of updates received from the server in assured safe
+ * data mode that timed out.
+ */
+ public void incrementAssuredSdReceivedUpdatesTimeout()
+ {
+ assuredSdReceivedUpdatesTimeout.incrementAndGet();
+ }
+
+ /**
+ * Increment the number of updates sent to the server in assured safe data
+ * mode.
+ */
+ public void incrementAssuredSdSentUpdates()
+ {
+ assuredSdSentUpdates++;
+ }
+
+ /**
+ * Increment the number of updates sent to the server in assured safe data
+ * mode that timed out.
+ */
+ public void incrementAssuredSdSentUpdatesTimeout()
+ {
+ assuredSdSentUpdatesTimeout.incrementAndGet();
+ }
+
+ /**
+ * Increment the number of updates received from the server in assured safe
+ * read mode.
+ */
+ public void incrementAssuredSrReceivedUpdates()
+ {
+ assuredSrReceivedUpdates++;
+ }
+
+ /**
+ * Increment the number of updates received from the server in assured safe
+ * read mode that timed out.
+ */
+ public void incrementAssuredSrReceivedUpdatesTimeout()
+ {
+ assuredSrReceivedUpdatesTimeout.incrementAndGet();
+ }
+
+ /**
+ * Increment the number of updates sent to the server in assured safe read
+ * mode.
+ */
+ public void incrementAssuredSrSentUpdates()
+ {
+ assuredSrSentUpdates++;
+ }
+
+ /**
+ * Increment the number of updates sent to the server in assured safe read
+ * mode that timed out.
+ */
+ public void incrementAssuredSrSentUpdatesTimeout()
+ {
+ assuredSrSentUpdatesTimeout.incrementAndGet();
+ }
+
+ /**
+ * Increase the counter of update received from the server.
+ */
+ public void incrementInCount()
+ {
+ inCount++;
+ }
+
+ /**
+ * Increase the counter of updates sent to the server.
+ */
+ public void incrementOutCount()
+ {
+ outCount++;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException, InitializationException
+ {
+ // Nothing to do for now
+ }
+
+ /**
+ * Check if the server associated to this ServerHandler is a data server
+ * in the topology.
+ * @return true if the server is a data server.
+ */
+ public abstract boolean isDataServer();
+
+ /**
+ * Check if the server associated to this ServerHandler is a replication
+ * server.
+ * @return true if the server is a replication server.
+ */
+ public boolean isReplicationServer()
+ {
+ return (!this.isDataServer());
+ }
+
+ /**
+ * Lock the domain potentially with a timeout.
+ * @param timedout The provided timeout.
+ * @throws DirectoryException When an exception occurs.
+ */
+ protected void lockDomain(boolean timedout)
+ throws DirectoryException
+ {
+ // The handshake phase must be done by blocking any access to structures
+ // keeping info on connected servers, so that one can safely check for
+ // pre-existence of a server, send a coherent snapshot of known topology
+ // to peers, update the local view of the topology...
+ //
+ // For instance a kind of problem could be that while we connect with a
+ // peer RS, a DS is connecting at the same time and we could publish the
+ // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
+ //
+ // This method and every others that need to read/make changes to the
+ // structures holding topology for the domain should:
+ // - call ReplicationServerDomain.lock()
+ // - read/modify structures
+ // - call ReplicationServerDomain.release()
+ //
+ // More information is provided in comment of ReplicationServerDomain.lock()
+
+ // If domain already exists, lock it until handshake is finished otherwise
+ // it will be created and locked later in the method
+ try
+ {
+ if (!timedout)
+ {
+ // !timedout
+ if (!replicationServerDomain.hasLock())
+ replicationServerDomain.lock();
+ }
+ else
+ {
+ // timedout
+ /**
+ * Take the lock on the domain.
+ * WARNING: Here we try to acquire the lock with a timeout. This
+ * is for preventing a deadlock that may happen if there are cross
+ * connection attempts (for same domain) from this replication
+ * server and from a peer one:
+ * Here is the scenario:
+ * - RS1 connect thread takes the domain lock and starts
+ * connection to RS2
+ * - at the same time RS2 connect thread takes his domain lock and
+ * start connection to RS2
+ * - RS2 listen thread starts processing received
+ * ReplServerStartMsg from RS1 and wants to acquire the lock on
+ * the domain (here) but cannot as RS2 connect thread already has
+ * it
+ * - RS1 listen thread starts processing received
+ * ReplServerStartMsg from RS2 and wants to acquire the lock on
+ * the domain (here) but cannot as RS1 connect thread already has
+ * it
+ * => Deadlock: 4 threads are locked.
+ * So to prevent that in such situation, the listen threads here
+ * will both timeout trying to acquire the lock. The random time
+ * for the timeout should allow on connection attempt to be
+ * aborted whereas the other one should have time to finish in the
+ * same time.
+ * Warning: the minimum time (3s) should be big enough to allow
+ * normal situation connections to terminate. The added random
+ * time should represent a big enough range so that the chance to
+ * have one listen thread timing out a lot before the peer one is
+ * great. When the first listen thread times out, the remote
+ * connect thread should release the lock and allow the peer
+ * listen thread to take the lock it was waiting for and process
+ * the connection attempt.
+ */
+ Random random = new Random();
+ int randomTime = random.nextInt(6); // Random from 0 to 5
+ // Wait at least 3 seconds + (0 to 5 seconds)
+ long timeout = (long) (3000 + ( randomTime * 1000 ) );
+ boolean noTimeout = replicationServerDomain.tryLock(timeout);
+ if (!noTimeout)
+ {
+ // Timeout
+ Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+ getServiceId(),
+ Short.toString(serverId),
+ Short.toString(replicationServerId));
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Thread interrupted
+ Message message = ERR_EXCEPTION_LOCKING_RS_DOMAIN.get(e.getMessage());
+ logError(message);
+ }
+ }
+
+ /**
+ * Processes a routable message.
+ *
+ * @param msg The message to be processed.
+ */
+ public void process(RoutableMsg msg)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + this +
+ " processes received msg:\n" + msg);
+ replicationServerDomain.process(msg, this);
+ }
+
+ /**
+ * Process the reception of a WindowProbeMsg message.
+ *
+ * @param windowProbeMsg The message to process.
+ *
+ * @throws IOException When the session becomes unavailable.
+ */
+ public void process(WindowProbeMsg windowProbeMsg) throws IOException
+ {
+ if (rcvWindow > 0)
+ {
+ // The LDAP server believes that its window is closed
+ // while it is not, this means that some problem happened in the
+ // window exchange procedure !
+ // lets update the LDAP server with out current window size and hope
+ // that everything will work better in the futur.
+ // TODO also log an error message.
+ WindowMsg msg = new WindowMsg(rcvWindow);
+ session.publish(msg);
+ } else
+ {
+ // Both the LDAP server and the replication server believes that the
+ // window is closed. Lets check the flowcontrol in case we
+ // can now resume operations and send a windowMessage if necessary.
+ checkWindow();
+ }
+ }
+
+ /**
+ * Send an InitializeRequestMessage to the server connected through this
+ * handler.
+ *
+ * @param msg The message to be processed
+ * @throws IOException when raised by the underlying session
+ */
+ public void send(RoutableMsg msg) throws IOException
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + this +
+ " publishes message:\n" + msg);
+ session.publish(msg);
+ }
+
+ /**
+ * Sends an ack message to the server represented by this object.
+ *
+ * @param ack The ack message to be sent.
+ * @throws IOException In case of Exception thrown sending the ack.
+ */
+ public void sendAck(AckMsg ack) throws IOException
+ {
+ session.publish(ack);
+ }
+
+ /**
+ * Send an ErrorMsg to the peer.
+ *
+ * @param errorMsg The message to be sent
+ * @throws IOException when raised by the underlying session
+ */
+ public void sendError(ErrorMsg errorMsg) throws IOException
+ {
+ session.publish(errorMsg);
+ }
+
+ /**
+ * Send the ReplServerStartMsg to the remote server (RS or DS).
+ * @param requestedProtocolVersion The provided protocol version.
+ * @return The ReplServerStartMsg sent.
+ * @throws IOException When an exception occurs.
+ */
+ public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
+ throws IOException
+ {
+ this.localGenerationId = replicationServerDomain.getGenerationId();
+ ReplServerStartMsg outReplServerStartMsg
+ = new ReplServerStartMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold());
+
+ if (requestedProtocolVersion>0)
+ session.publish(outReplServerStartMsg, requestedProtocolVersion);
+ else
+ session.publish(outReplServerStartMsg);
+
+ return outReplServerStartMsg;
+ }
+
+ /**
+ * Sends the provided TopologyMsg to the peer server.
+ *
+ * @param topoMsg The TopologyMsg message to be sent.
+ * @throws IOException When it occurs while sending the message,
+ *
+ */
+ public void sendTopoInfo(TopologyMsg topoMsg)
+ throws IOException
+ {
+ // V1 Rs do not support the TopologyMsg
+ if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ session.publish(topoMsg);
+ }
+ }
+
+ /**
+ * Set a new generation ID.
+ *
+ * @param generationId The new generation ID
+ *
+ */
+ public void setGenerationId(long generationId)
+ {
+ this.generationId = generationId;
+ }
+
+ /**
+ * Sets the replication server domain associated.
+ * @param rsd The provided replication server domain.
+ */
+ protected void setReplicationServerDomain(ReplicationServerDomain rsd)
+ {
+ this.replicationServerDomain = rsd;
+ }
+
+ /**
+ * Sets the window size when used when sending to the remote.
+ * @param size The provided window size.
+ */
+ protected void setSendWindowSize(int size)
+ {
+ this.sendWindowSize = size;
+ }
+
+ /**
+ * Requests to shutdown the writer.
+ */
+ protected void shutdownWriter()
+ {
+ shutdownWriter = true;
+ }
+
+ /**
* Shutdown This ServerHandler.
*/
public void shutdown()
{
- /*
- * Shutdown ServerWriter
- */
- shutdownWriter = true;
- activeWriter = false;
- synchronized (msgQueue)
- {
- /* wake up the writer thread on an empty queue so that it disappear */
- msgQueue.clear();
- msgQueue.notify();
- msgQueue.notifyAll();
- }
+ shutdownWriter();
+ setConsumerActive(false);
+ super.shutdown();
- /*
- * Close session to end ServerReader or ServerWriter
- */
- try
+ if (session != null)
{
- session.close();
- } catch (IOException e)
- {
- // ignore.
- }
-
- /*
- * Stop the remote LSHandler
- */
- synchronized (directoryServers)
- {
- for (LightweightServerHandler lsh : directoryServers.values())
+ // Close session to end ServerReader or ServerWriter
+ try
{
- lsh.stopHandler();
+ session.close();
+ } catch (IOException e)
+ {
+ // ignore.
}
- directoryServers.clear();
}
/*
@@ -2240,68 +1167,93 @@
{
// don't try anymore to join and return.
}
+ if (debugEnabled())
+ TRACER.debugInfo("SH.shutdowned(" + this + ")");
}
/**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- String localString;
- if (serverId != 0)
- {
- if (serverIsLDAPserver)
- localString = "Directory Server ";
- else
- localString = "Replication Server ";
-
-
- localString += serverId + " " + serverURL + " " + baseDn;
- } else
- localString = "Unknown server";
-
- return localString;
- }
-
- /**
- * Decrement the protocol window, then check if it is necessary
- * to send a WindowMsg and send it.
+ * Select the next update that must be sent to the server managed by this
+ * ServerHandler.
*
- * @throws IOException when the session becomes unavailable.
+ * @return the next update that must be sent to the server managed by this
+ * ServerHandler.
*/
- public synchronized void decAndCheckWindow() throws IOException
+ public UpdateMsg take()
{
- rcvWindow--;
- checkWindow();
- }
+ boolean interrupted = true;
+ UpdateMsg msg = getnextMessage(true); // synchronous:block until msg
- /**
- * Check the protocol window and send WindowMsg if necessary.
- *
- * @throws IOException when the session becomes unavailable.
- */
- public synchronized void checkWindow() throws IOException
- {
- if (rcvWindow < rcvWindowSizeHalf)
+ /*
+ * When we remove a message from the queue we need to check if another
+ * server is waiting in flow control because this queue was too long.
+ * This check might cause a performance penalty an therefore it
+ * is not done for every message removed but only every few messages.
+ */
+ if (++saturationCount > 10)
{
- if (flowControl)
+ saturationCount = 0;
+ try
{
- if (replicationServerDomain.restartAfterSaturation(this))
- {
- flowControl = false;
- }
- }
- if (!flowControl)
+ replicationServerDomain.checkAllSaturation();
+ } catch (IOException e)
{
- WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
- session.publish(msg);
- rcvWindow += rcvWindowSizeHalf;
}
}
+ boolean acquired = false;
+ do
+ {
+ try
+ {
+ acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS);
+ interrupted = false;
+ } catch (InterruptedException e)
+ {
+ // loop until not interrupted
+ }
+ } while (((interrupted) || (!acquired)) && (!shutdownWriter));
+ if (msg != null)
+ {
+ incrementOutCount();
+
+ if (msg.isAssured())
+ {
+ if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
+ {
+ incrementAssuredSrSentUpdates();
+ } else
+ {
+ if (!isDataServer())
+ incrementAssuredSdSentUpdates();
+ }
+ }
+ }
+ return msg;
}
/**
+ * Creates a RSInfo structure representing this remote RS.
+ * @return The RSInfo structure representing this remote RS
+ */
+ public RSInfo toRSInfo()
+ {
+ RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+
+ return rsInfo;
+ }
+
+ /**
+ * Performs any processing periodic processing that may be desired to update
+ * the information associated with this monitor. Note that best-effort
+ * attempts will be made to ensure that calls to this method come
+ * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
+ * be made.
+ */
+ @Override
+ public void updateMonitorData()
+ {
+ // As long as getUpdateInterval() returns 0, this will never get called
+ }
+ /**
* Update the send window size based on the credit specified in the
* given window message.
*
@@ -2314,510 +1266,122 @@
}
/**
- * Get our heartbeat interval.
- * @return Our heartbeat interval.
+ * Log the messages involved in the start handshake.
+ * @param inStartMsg The message received first.
+ * @param outStartMsg The message sent in response.
*/
- public long getHeartbeatInterval()
- {
- return heartbeatInterval;
- }
-
- /**
- * Processes a routable message.
- *
- * @param msg The message to be processed.
- */
- public void process(RoutableMsg msg)
+ protected void logStartHandshakeRCVandSND(
+ StartMsg inStartMsg,
+ StartMsg outStartMsg)
{
if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " SH for remote server " + this.getMonitorInstanceName() + ":" +
- "\nprocesses received msg:\n" + msg);
- replicationServerDomain.process(msg, this);
- }
-
- /**
- * Sends the provided TopologyMsg to the peer server.
- *
- * @param topoMsg The TopologyMsg message to be sent.
- * @throws IOException When it occurs while sending the message,
- *
- */
- public void sendTopoInfo(TopologyMsg topoMsg)
- throws IOException
- {
- // V1 Rs do not support the TopologyMsg
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " SH for remote server " + this.getMonitorInstanceName() + ":" +
- "\nsends message:\n" + topoMsg);
-
- session.publish(topoMsg);
- }
- }
-
- /**
- * Stores topology information received from a peer RS and that must be kept
- * in RS handler.
- *
- * @param topoMsg The received topology message
- */
- public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
- {
- // Store info for remote RS
- List<RSInfo> rsInfos = topoMsg.getRsList();
- // List should only contain RS info for sender
- RSInfo rsInfo = rsInfos.get(0);
- generationId = rsInfo.getGenerationId();
- groupId = rsInfo.getGroupId();
-
- /**
- * Store info for DSs connected to the peer RS
- */
- List<DSInfo> dsInfos = topoMsg.getDsList();
-
- synchronized (directoryServers)
- {
- // Removes the existing structures
- for (LightweightServerHandler lsh : directoryServers.values())
- {
- lsh.stopHandler();
- }
- directoryServers.clear();
-
- // Creates the new structure according to the message received.
- for (DSInfo dsInfo : dsInfos)
- {
- LightweightServerHandler lsh = new LightweightServerHandler(this,
- serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
- dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
- dsInfo.isAssured(), dsInfo.getAssuredMode(),
- dsInfo.getSafeDataLevel());
- lsh.startHandler();
- directoryServers.put(lsh.getServerId(), lsh);
- }
- }
- }
-
- /**
- * Process message of a remote server changing his status.
- * @param csMsg The message containing the new status
- * @return The new server status of the DS
- */
- public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
- {
-
- // Sanity check
- if (!serverIsLDAPserver)
- {
- Message msg =
- ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(baseDn.toString(),
- Short.toString(serverId), csMsg.toString());
- logError(msg);
- return ServerStatus.INVALID_STATUS;
- }
-
- // Get the status the DS just entered
- ServerStatus reqStatus = csMsg.getNewStatus();
- // Translate new status to a state machine event
- StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
- if (event == StatusMachineEvent.INVALID_EVENT)
- {
- Message msg = ERR_RS_INVALID_NEW_STATUS.get(reqStatus.toString(),
- baseDn.toString(), Short.toString(serverId));
- logError(msg);
- return ServerStatus.INVALID_STATUS;
- }
-
- // Check state machine allows this new status
- ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
- if (newStatus == ServerStatus.INVALID_STATUS)
- {
- Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
- Short.toString(serverId), status.toString(), event.toString());
- logError(msg);
- return ServerStatus.INVALID_STATUS;
- }
-
- status = newStatus;
-
- return status;
- }
-
- /**
- * Change the status according to the event generated from the status
- * analyzer.
- * @param event The event to be used for new status computation
- * @return The new status of the DS
- * @throws IOException When raised by the underlying session
- */
- public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
- throws IOException
- {
- // Check state machine allows this new status (Sanity check)
- ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
- if (newStatus == ServerStatus.INVALID_STATUS)
- {
- Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
- Short.toString(serverId), status.toString(), event.toString());
- logError(msg);
- // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
- // and vice versa. We may are being trying to change the status while for
- // instance another status has just been entered: e.g a full update has
- // just been engaged. In that case, just ignore attempt to change the
- // status
- return newStatus;
- }
-
- // Send message requesting to change the DS status
- ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
- ServerStatus.INVALID_STATUS);
-
- if (debugEnabled())
- {
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " Sending change status from status analyzer to " + getServerId() +
- " for baseDn " + baseDn + ":\n" + csMsg);
- }
-
- session.publish(csMsg);
-
- status = newStatus;
-
- return newStatus;
- }
-
- /**
- * When this handler is connected to a replication server, specifies if
- * a wanted server is connected to this replication server.
- *
- * @param wantedServer The server we want to know if it is connected
- * to the replication server represented by this handler.
- * @return boolean True is the wanted server is connected to the server
- * represented by this handler.
- */
- public boolean isRemoteLDAPServer(short wantedServer)
- {
- synchronized (directoryServers)
- {
- for (LightweightServerHandler server : directoryServers.values())
- {
- if (wantedServer == server.getServerId())
- {
- return true;
- }
- }
- return false;
- }
- }
-
- /**
- * When the handler is connected to a replication server, specifies the
- * replication server has remote LDAP servers connected to it.
- *
- * @return boolean True is the replication server has remote LDAP servers
- * connected to it.
- */
- public boolean hasRemoteLDAPServers()
- {
- synchronized (directoryServers)
- {
- return !directoryServers.isEmpty();
- }
- }
-
- /**
- * Send an InitializeRequestMessage to the server connected through this
- * handler.
- *
- * @param msg The message to be processed
- * @throws IOException when raised by the underlying session
- */
- public void send(RoutableMsg msg) throws IOException
- {
- if (debugEnabled())
TRACER.debugInfo("In " +
replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " SH for remote server " + this.getMonitorInstanceName() + ":" +
- "\nsends message:\n" + msg);
- session.publish(msg);
- }
-
- /**
- * Send an ErrorMsg to the peer.
- *
- * @param errorMsg The message to be sent
- * @throws IOException when raised by the underlying session
- */
- public void sendError(ErrorMsg errorMsg) throws IOException
- {
- session.publish(errorMsg);
- }
-
- /**
- * Process the reception of a WindowProbeMsg message.
- *
- * @param windowProbeMsg The message to process.
- *
- * @throws IOException When the session becomes unavailable.
- */
- public void process(WindowProbeMsg windowProbeMsg) throws IOException
- {
- if (rcvWindow > 0)
- {
- // The LDAP server believes that its window is closed
- // while it is not, this means that some problem happened in the
- // window exchange procedure !
- // lets update the LDAP server with out current window size and hope
- // that everything will work better in the futur.
- // TODO also log an error message.
- WindowMsg msg = new WindowMsg(rcvWindow);
- session.publish(msg);
- } else
- {
- // Both the LDAP server and the replication server believes that the
- // window is closed. Lets check the flowcontrol in case we
- // can now resume operations and send a windowMessage if necessary.
- checkWindow();
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + ":" +
+ "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
+ "\nAND REPLIED:\n" + outStartMsg.toString());
}
}
/**
- * Returns the value of generationId for that handler.
- * @return The value of the generationId.
+ * Log the messages involved in the start handshake.
+ * @param outStartMsg The message sent first.
+ * @param inStartMsg The message received in response.
*/
- public long getGenerationId()
+ protected void logStartHandshakeSNDandRCV(
+ StartMsg outStartMsg,
+ StartMsg inStartMsg)
{
- return generationId;
- }
-
- /**
- * Sends a message containing a generationId to a peer server.
- * The peer is expected to be a replication server.
- *
- * @param msg The GenerationIdMessage message to be sent.
- * @throws IOException When it occurs while sending the message,
- *
- */
- public void forwardGenerationIdToRS(ResetGenerationIdMsg msg)
- throws IOException
- {
- session.publish(msg);
- }
-
- /**
- * Set a new generation ID.
- *
- * @param generationId The new generation ID
- *
- */
- public void setGenerationId(long generationId)
- {
- this.generationId = generationId;
- }
-
- /**
- * Returns the Replication Server Domain to which belongs this server handler.
- *
- * @return The replication server domain.
- */
- public ReplicationServerDomain getDomain()
- {
- return this.replicationServerDomain;
- }
-
- /**
- * Return a Set containing the servers known by this replicationServer.
- * @return a set containing the servers known by this replicationServer.
- */
- public Set<Short> getConnectedDirectoryServerIds()
- {
- synchronized (directoryServers)
- {
- return directoryServers.keySet();
- }
- }
-
- /**
- * Order the peer DS server to change his status or close the connection
- * according to the requested new generation id.
- * @param newGenId The new generation id to take into account
- * @throws IOException If IO error occurred.
- */
- public void changeStatusForResetGenId(long newGenId)
- throws IOException
- {
- StatusMachineEvent event = null;
-
- if (newGenId == -1)
- {
- // The generation id is being made invalid, let's put the DS
- // into BAD_GEN_ID_STATUS
- event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
- } else
- {
- if (newGenId == generationId)
- {
- if (status == ServerStatus.BAD_GEN_ID_STATUS)
- {
- // This server has the good new reference generation id.
- // Close connection with him to force his reconnection: DS will
- // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
-
- if (debugEnabled())
- {
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- ". Closing connection to DS " + getServerId() +
- " for baseDn " + baseDn + " to force reconnection as new local" +
- " generation id and remote one match and DS is in bad gen id: " +
- newGenId);
- }
-
- // Connection closure must not be done calling RSD.stopHandler() as it
- // would rewait the RSD lock that we already must have entering this
- // method. This would lead to a reentrant lock which we do not want.
- // So simply close the session, this will make the hang up appear
- // after the reader thread that took the RSD lock realeases it.
- try
- {
- if (session != null)
- session.close();
- } catch (IOException e)
- {
- // ignore
- }
-
- // NOT_CONNECTED_STATUS is the last one in RS session life: handler
- // will soon disappear after this method call...
- status = ServerStatus.NOT_CONNECTED_STATUS;
- return;
- } else
- {
- if (debugEnabled())
- {
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- ". DS " + getServerId() + " for baseDn " + baseDn +
- " has already generation id " + newGenId +
- " so no ChangeStatusMsg sent to him.");
- }
- return;
- }
- } else
- {
- // This server has a bad generation id compared to new reference one,
- // let's put it into BAD_GEN_ID_STATUS
- event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
- }
- }
-
- if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
- (status == ServerStatus.FULL_UPDATE_STATUS))
- {
- // Prevent useless error message (full update status cannot lead to bad
- // gen status)
- Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
- Short.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
- baseDn.toString(),
- Short.toString(serverId),
- Long.toString(generationId),
- Long.toString(newGenId));
- logError(message);
- return;
- }
-
- ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
-
- if (newStatus == ServerStatus.INVALID_STATUS)
- {
- Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
- Short.toString(serverId), status.toString(), event.toString());
- logError(msg);
- return;
- }
-
- // Send message requesting to change the DS status
- ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
- ServerStatus.INVALID_STATUS);
-
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " Sending change status for reset gen id to " + getServerId() +
- " for baseDn " + baseDn + ":\n" + csMsg);
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + ":" +
+ "\nSH START HANDSHAKE SENT("+ this +
+ "):\n" + outStartMsg.toString()+
+ "\nAND RECEIVED:\n" + inStartMsg.toString());
}
-
- session.publish(csMsg);
-
- status = newStatus;
}
/**
- * Set the shut down flag to true and returns the previous value of the flag.
- * @return The previous value of the shut down flag
+ * Log the messages involved in the Topology handshake.
+ * @param inTopoMsg The message received first.
+ * @param outTopoMsg The message sent in response.
*/
- public boolean engageShutdown()
+ protected void logTopoHandshakeRCVandSND(
+ TopologyMsg inTopoMsg,
+ TopologyMsg outTopoMsg)
{
- // Use thread safe boolean
- return shuttingDown.getAndSet(true);
- }
-
- /**
- * Gets the status of the connected DS.
- * @return The status of the connected DS.
- */
- public ServerStatus getStatus()
- {
- return status;
- }
-
- /**
- * Gets the protocol version used with this remote server.
- * @return The protocol version used with this remote server.
- */
- public short getProtocolVersion()
- {
- return protocolVersion;
- }
-
- /**
- * Add the DSinfos of the connected Directory Servers
- * to the List of DSInfo provided as a parameter.
- *
- * @param dsInfos The List of DSInfo that should be updated
- * with the DSInfo for the directoryServers
- * connected to this ServerHandler.
- */
- public void addDSInfos(List<DSInfo> dsInfos)
- {
- synchronized (directoryServers)
+ if (debugEnabled())
{
- for (LightweightServerHandler ls : directoryServers.values())
- {
- dsInfos.add(ls.toDSInfo());
- }
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + ":" +
+ "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
+ "\nAND REPLIED:\n" + outTopoMsg.toString());
}
}
/**
- * Gets the group id of the server represented by this object.
- * @return The group id of the server represented by this object.
+ * Log the messages involved in the Topology handshake.
+ * @param outTopoMsg The message sent first.
+ * @param inTopoMsg The message received in response.
*/
- public byte getGroupId()
+ protected void logTopoHandshakeSNDandRCV(
+ TopologyMsg outTopoMsg,
+ TopologyMsg inTopoMsg)
{
- return groupId;
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + ":" +
+ "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
+ "\nAND RECEIVED:\n" + inTopoMsg.toString());
+ }
}
+
+ /**
+ * Log the messages involved in the Topology/StartSession handshake.
+ * @param inStartSessionMsg The message received first.
+ * @param outTopoMsg The message sent in response.
+ */
+ protected void logStartSessionHandshake(
+ StartSessionMsg inStartSessionMsg,
+ TopologyMsg outTopoMsg)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + " :" +
+ "\nSH SESSION HANDSHAKE RECEIVED:\n" +
+ "\nAND REPLIED:\n" + outTopoMsg.toString());
+ }
+ }
+
+ /**
+ * Log the messages involved in the Topology/StartSession handshake.
+ * @param inStartECLSessionMsg The message received first.
+ */
+ protected void logStartECLSessionHandshake(
+ StartECLSessionMsg inStartECLSessionMsg)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + " :" +
+ "\nSH SESSION HANDSHAKE RECEIVED:\n" +
+ inStartECLSessionMsg.toString());
+ }
+ }
+
}
--
Gitblit v1.10.0