| | |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | import java.util.List; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.Map; |
| | | import java.util.Random; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachine; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | | * replication server. |
| | | * peer server (RS or DS). |
| | | */ |
| | | public class ServerHandler extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Time during which the server will wait for existing thread to stop |
| | | * during the shutdown. |
| | | * during the shutdownWriter. |
| | | */ |
| | | private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; |
| | | |
| | | /* |
| | | * Properties, filled if remote server is either a DS or a RS |
| | | */ |
| | | private short serverId; |
| | | private ProtocolSession session; |
| | | private final MsgQueue msgQueue = new MsgQueue(); |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private final Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private final Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ReplicationServerDomain replicationServerDomain = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | | |
| | | private int inCount = 0; // number of updates received from the server |
| | | |
| | | private int inAckCount = 0; |
| | | private int outAckCount = 0; |
| | | private int maxReceiveQueue = 0; |
| | |
| | | private boolean serverIsLDAPserver; |
| | | private boolean following = false; |
| | | private ServerState serverState; |
| | | private boolean active = true; |
| | | private boolean activeWriter = true; |
| | | private ServerWriter writer = null; |
| | | private DN baseDn = null; |
| | | private String serverAddressURL; |
| | | private int rcvWindow; |
| | | private int rcvWindowSizeHalf; |
| | | private int maxRcvWindow; |
| | |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | private boolean flowControl = false; // indicate that the server is |
| | | // flow controlled and should |
| | | // be stopped from sending messages. |
| | | // flow controlled and should |
| | | // be stopped from sending messages. |
| | | |
| | | private int saturationCount = 0; |
| | | private short replicationServerId; |
| | | |
| | | private short protocolVersion; |
| | | private short protocolVersion = -1; |
| | | private long generationId = -1; |
| | | |
| | | // Group id of this remote server |
| | | private byte groupId = (byte) -1; |
| | | |
| | | /* |
| | | * Properties filled only if remote server is a DS |
| | | */ |
| | | |
| | | // Status of this DS |
| | | private ServerStatus status = ServerStatus.INVALID_STATUS; |
| | | // Referrals URLs this DS is exporting |
| | | private List<String> refUrls = new ArrayList<String>(); |
| | | // Assured replication enabled on DS or not |
| | | private boolean assuredFlag = false; |
| | | // DS assured mode (relevant if assured replication enabled) |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // DS safe data level (relevant if assured mode is safe data) |
| | | private byte safeDataLevel = (byte) -1; |
| | | |
| | | /* |
| | | * Properties filled only if remote server is a RS |
| | | */ |
| | | private String serverAddressURL; |
| | | /** |
| | | * When this Handler is related to a remote replication server |
| | | * this collection will contain as many elements as there are |
| | | * LDAP servers connected to the remote replication server. |
| | | */ |
| | | private final Map<Short, LightweightServerHandler> connectedServers = |
| | | private final Map<Short, LightweightServerHandler> directoryServers = |
| | | new ConcurrentHashMap<Short, LightweightServerHandler>(); |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | |
| | | /** |
| | | * The thread that will send heartbeats. |
| | | */ |
| | | HeartbeatThread heartbeatThread = null; |
| | | |
| | | /** |
| | | * Set when ServerWriter is stopping. |
| | | */ |
| | | private boolean shutdownWriter = false; |
| | | /** |
| | | * Set when ServerHandler is stopping. |
| | | */ |
| | | private boolean shutdown = false; |
| | | |
| | | private AtomicBoolean shuttingDown = new AtomicBoolean(false); |
| | | private static final Map<ChangeNumber, ReplServerAckMessageList> |
| | | changelogsWaitingAcks = |
| | | new HashMap<ChangeNumber, ReplServerAckMessageList>(); |
| | | changelogsWaitingAcks = |
| | | new HashMap<ChangeNumber, ReplServerAckMessageList>(); |
| | | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | this.maxQueueBytesSize = queueSize * 100; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | } |
| | | |
| | | /** |
| | | * Do the exchange of start messages to know if the remote |
| | | * server is an LDAP or replication server and to exchange serverID. |
| | | * Then create the reader and writer thread. |
| | | * Creates a DSInfo structure representing this remote DS. |
| | | * @return The DSInfo structure representing this remote DS |
| | | */ |
| | | public DSInfo toDSInfo() |
| | | { |
| | | DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId, |
| | | status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls); |
| | | |
| | | return dsInfo; |
| | | } |
| | | |
| | | /** |
| | | * Creates a RSInfo structure representing this remote RS. |
| | | * @return The RSInfo structure representing this remote RS |
| | | */ |
| | | public RSInfo toRSInfo() |
| | | { |
| | | RSInfo rsInfo = new RSInfo(serverId, generationId, groupId); |
| | | |
| | | return rsInfo; |
| | | } |
| | | |
| | | /** |
| | | * Do the handshake with either the DS or RS and then create the reader and |
| | | * writer thread. |
| | | * |
| | | * There are 2 possible handshake sequences: DS<->RS and RS<->RS. Each one are |
| | | * divided into 2 logical consecutive phases (phase 1 and phase 2): |
| | | * |
| | | * DS<->RS (DS (always initiating connection) always sends first message): |
| | | * ------- |
| | | * |
| | | * phase 1: |
| | | * DS --- ServerStartMsg ---> RS |
| | | * DS <--- ReplServerStartMsg --- RS |
| | | * phase 2: |
| | | * DS --- StartSessionMsg ---> RS |
| | | * DS <--- TopologyMsg --- RS |
| | | * |
| | | * RS<->RS (RS initiating connection always sends first message): |
| | | * ------- |
| | | * |
| | | * phase 1: |
| | | * RS1 --- ReplServerStartMsg ---> RS2 |
| | | * RS1 <--- ReplServerStartMsg --- RS2 |
| | | * phase 2: |
| | | * RS1 --- TopologyMsg ---> RS2 |
| | | * RS1 <--- TopologyMsg --- RS2 |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection (listen). |
| | |
| | | * handler. |
| | | */ |
| | | public void start(DN baseDn, short replicationServerId, |
| | | String replicationServerURL, |
| | | int windowSize, boolean sslEncryption, |
| | | ReplicationServer replicationServer) |
| | | String replicationServerURL, |
| | | int windowSize, boolean sslEncryption, |
| | | ReplicationServer replicationServer) |
| | | { |
| | | |
| | | // The handshake phase must be done by blocking any access to structures |
| | | // keeping info on connected servers, so that one can safely check for |
| | | // pre-existence of a server, send a coherent snapshot of known topology |
| | | // to peers, update the local view of the topology... |
| | | // |
| | | // For instance a kind of problem could be that while we connect with a |
| | | // peer RS, a DS is connecting at the same time and we could publish the |
| | | // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. |
| | | // |
| | | // This method and every others that need to read/make changes to the |
| | | // structures holding topology for the domain should: |
| | | // - call ReplicationServerDomain.lock() |
| | | // - read/modify structures |
| | | // - call ReplicationServerDomain.release() |
| | | // |
| | | // More information is provided in comment of ReplicationServerDomain.lock() |
| | | |
| | | // If domain already exists, lock it until handshake is finished otherwise |
| | | // it will be created and locked later in the method |
| | | if (baseDn != null) |
| | | { |
| | | ReplicationServerDomain rsd = |
| | | replicationServer.getReplicationServerDomain(baseDn, false); |
| | | if (rsd != null) |
| | | { |
| | | try |
| | | { |
| | | rsd.lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | |
| | | long oldGenerationId = -100; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + |
| | | " starts a new LS or RS " + |
| | | ((baseDn == null)?"incoming connection":"outgoing connection")); |
| | | " starts a new LS or RS " + |
| | | ((baseDn == null) ? "incoming connection" : "outgoing connection")); |
| | | |
| | | this.replicationServerId = replicationServerId; |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | rcvWindowSizeHalf = windowSize / 2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | long localGenerationId = -1; |
| | | boolean handshakeOnly = false; |
| | | ReplServerStartMsg outReplServerStartMsg = null; |
| | | |
| | | /** |
| | | * This boolean prevents from logging a polluting error when connection\ |
| | | * aborted from a DS that wanted only to perform handshake phase 1 in order |
| | | * to determine the best suitable RS: |
| | | * 1) -> ServerStartMsg |
| | | * 2) <- ReplServerStartMsg |
| | | * 3) connection closure |
| | | */ |
| | | boolean log_error_message = true; |
| | | |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | | /* |
| | | * PROCEDE WITH FIRST PHASE OF HANDSHAKE: |
| | | * ServerStartMsg then ReplServerStartMsg (with a DS) |
| | | * OR |
| | | * ReplServerStartMsg then ReplServerStartMsg (with a RS) |
| | | */ |
| | | |
| | | if (baseDn != null) // Outgoing connection |
| | | |
| | | { |
| | | // This is an outgoing connection. Publish our start message. |
| | | this.baseDn = baseDn; |
| | | |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(baseDn, true); |
| | | replicationServer.getReplicationServerDomain(baseDn, true); |
| | | if (!replicationServerDomain.hasLock()) |
| | | { |
| | | try |
| | | { |
| | | replicationServerDomain.lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | } |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | |
| | | ServerState localServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState, |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption); |
| | | replicationServerDomain.getDbServerState(); |
| | | outReplServerStartMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, |
| | | baseDn, windowSize, localServerState, |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption, |
| | | replicationServer.getGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | |
| | | session.publish(msg); |
| | | session.publish(outReplServerStartMsg); |
| | | } |
| | | |
| | | // Wait and process ServerStart or ReplServerStart |
| | | ReplicationMessage msg = session.receive(); |
| | | if (msg instanceof ServerStartMessage) |
| | | // Wait and process ServerStartMsg or ReplServerStartMsg |
| | | ReplicationMsg msg = session.receive(); |
| | | if (msg instanceof ServerStartMsg) |
| | | { |
| | | // The remote server is an LDAP Server. |
| | | ServerStartMessage receivedMsg = (ServerStartMessage) msg; |
| | | ServerStartMsg serverStartMsg = (ServerStartMsg) msg; |
| | | |
| | | generationId = receivedMsg.getGenerationId(); |
| | | generationId = serverStartMsg.getGenerationId(); |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | this.serverState = receivedMsg.getServerState(); |
| | | serverStartMsg.getVersion()); |
| | | serverId = serverStartMsg.getServerId(); |
| | | serverURL = serverStartMsg.getServerURL(); |
| | | this.baseDn = serverStartMsg.getBaseDn(); |
| | | this.serverState = serverStartMsg.getServerState(); |
| | | this.groupId = serverStartMsg.getGroupId(); |
| | | |
| | | maxReceiveDelay = receivedMsg.getMaxReceiveDelay(); |
| | | maxReceiveQueue = receivedMsg.getMaxReceiveQueue(); |
| | | maxSendDelay = receivedMsg.getMaxSendDelay(); |
| | | maxSendQueue = receivedMsg.getMaxSendQueue(); |
| | | heartbeatInterval = receivedMsg.getHeartbeatInterval(); |
| | | |
| | | handshakeOnly = receivedMsg.isHandshakeOnly(); |
| | | maxReceiveDelay = serverStartMsg.getMaxReceiveDelay(); |
| | | maxReceiveQueue = serverStartMsg.getMaxReceiveQueue(); |
| | | maxSendDelay = serverStartMsg.getMaxSendDelay(); |
| | | maxSendQueue = serverStartMsg.getMaxSendQueue(); |
| | | heartbeatInterval = serverStartMsg.getHeartbeatInterval(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | | sslEncryption = receivedMsg.getSSLEncryption(); |
| | | sslEncryption = serverStartMsg.getSSLEncryption(); |
| | | |
| | | if (maxReceiveQueue > 0) |
| | | restartReceiveQueue = (maxReceiveQueue > 1000 ? |
| | | maxReceiveQueue - 200 : |
| | | maxReceiveQueue*8/10); |
| | | restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue - |
| | | 200 : maxReceiveQueue * 8 / 10); |
| | | else |
| | | restartReceiveQueue = 0; |
| | | |
| | | if (maxSendQueue > 0) |
| | | restartSendQueue = (maxSendQueue > 1000 ? maxSendQueue - 200 : |
| | | maxSendQueue*8/10); |
| | | restartSendQueue = |
| | | (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 / |
| | | 10); |
| | | else |
| | | restartSendQueue = 0; |
| | | |
| | | if (maxReceiveDelay > 0) |
| | | restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 : |
| | | maxReceiveDelay); |
| | | restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1 |
| | | : maxReceiveDelay); |
| | | else |
| | | restartReceiveDelay = 0; |
| | | |
| | | if (maxSendDelay > 0) |
| | | restartSendDelay = (maxSendDelay > 10 ? |
| | | maxSendDelay -1 : |
| | | maxSendDelay); |
| | | restartSendDelay = |
| | | (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay); |
| | | else |
| | | restartSendDelay = 0; |
| | | |
| | |
| | | |
| | | // Get or Create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | |
| | | replicationServerDomain.waitDisconnection(receivedMsg.getServerId()); |
| | | replicationServerDomain.mayResetGenerationId(); |
| | | // Hack to be sure that if a server disconnects and reconnect, we |
| | | // let the reader thread see the closure and cleanup any reference |
| | | // to old connection |
| | | replicationServerDomain.waitDisconnection(serverStartMsg.getServerId()); |
| | | |
| | | if (!replicationServerDomain.hasLock()) |
| | | { |
| | | try |
| | | { |
| | | replicationServerDomain.lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // Duplicate server ? |
| | | if (!replicationServerDomain.checkForDuplicateDS(this)) |
| | | { |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | |
| | | ServerState localServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | replicationServerDomain.getDbServerState(); |
| | | // This an incoming connection. Publish our start message |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState, |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | ReplServerStartMsg replServerStartMsg = |
| | | new ReplServerStartMsg(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState, |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption, |
| | | replicationServer.getGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | session.publish(replServerStartMsg); |
| | | sendWindowSize = serverStartMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the negotiation */ |
| | | /* Until here session is encrypted then it depends on the |
| | | negotiation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = |
| | | replicationServerDomain.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer().getMonitorInstanceName() + |
| | | ", SH received START from LS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | | " localGenerationId=" + localGenerationId + |
| | | " state=" + ss + |
| | | " and sent ReplServerStart with state=" + lss); |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + serverStartMsg.toString() + |
| | | "\nAND REPLIED:\n" + replServerStartMsg.toString()); |
| | | } |
| | | |
| | | /* |
| | | * If we have already a generationID set for the domain |
| | | * then |
| | | * if the connecting replica has not the same |
| | | * then it is degraded locally and notified by an error message |
| | | * else |
| | | * we set the generationID from the one received |
| | | * (unsaved yet on disk . will be set with the 1rst change received) |
| | | */ |
| | | if (localGenerationId>0) |
| | | { |
| | | if (generationId != localGenerationId) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | receivedMsg.getBaseDn().toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // We are an empty Replication Server |
| | | if ((generationId>0)&&(!serverState.isEmpty())) |
| | | { |
| | | // If the LDAP server has already sent changes |
| | | // it is not expected to connect to an empty RS |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | receivedMsg.getBaseDn().toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | else |
| | | { |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | } |
| | | else if (msg instanceof ReplServerStartMessage) |
| | | } else if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | // The remote server is a replication server |
| | | ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; |
| | | ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) msg; |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | generationId = receivedMsg.getGenerationId(); |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | inReplServerStartMsg.getVersion()); |
| | | generationId = inReplServerStartMsg.getGenerationId(); |
| | | serverId = inReplServerStartMsg.getServerId(); |
| | | serverURL = inReplServerStartMsg.getServerURL(); |
| | | int separator = serverURL.lastIndexOf(':'); |
| | | serverAddressURL = |
| | | session.getRemoteAddress() + ":" + serverURL.substring(separator + 1); |
| | | session.getRemoteAddress() + ":" + serverURL.substring(separator + |
| | | 1); |
| | | serverIsLDAPserver = false; |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | // We support connection from a V1 RS |
| | | // Only V2 protocol has the group id in repl server start message |
| | | this.groupId = inReplServerStartMsg.getGroupId(); |
| | | } |
| | | this.baseDn = inReplServerStartMsg.getBaseDn(); |
| | | |
| | | if (baseDn == null) // Reply to incoming RS |
| | | |
| | | { |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = replicationServer. |
| | | getReplicationServerDomain(this.baseDn, true); |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | if (!replicationServerDomain.hasLock()) |
| | | { |
| | | try |
| | | { |
| | | /** |
| | | * Take the lock on the domain. |
| | | * WARNING: Here we try to acquire the lock with a timeout. This |
| | | * is for preventing a deadlock that may happen if there are cross |
| | | * connection attempts (for same domain) from this replication |
| | | * server and from a peer one: |
| | | * Here is the scenario: |
| | | * - RS1 connect thread takes the domain lock and starts |
| | | * connection to RS2 |
| | | * - at the same time RS2 connect thread takes his domain lock and |
| | | * start connection to RS2 |
| | | * - RS2 listen thread starts processing received |
| | | * ReplServerStartMsg from RS1 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS2 connect thread already has |
| | | * it |
| | | * - RS1 listen thread starts processing received |
| | | * ReplServerStartMsg from RS2 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS1 connect thread already has |
| | | * it |
| | | * => Deadlock: 4 threads are locked. |
| | | * So to prevent that in such situation, the listen threads here |
| | | * will both timeout trying to acquire the lock. The random time |
| | | * for the timeout should allow on connection attempt to be |
| | | * aborted whereas the other one should have time to finish in the |
| | | * same time. |
| | | * Warning: the minimum time (3s) should be big enough to allow |
| | | * normal situation connections to terminate. The added random |
| | | * time should represent a big enough range so that the chance to |
| | | * have one listen thread timing out a lot before the peer one is |
| | | * great. When the first listen thread times out, the remote |
| | | * connect thread should release the lock and allow the peer |
| | | * listen thread to take the lock it was waiting for and process |
| | | * the connection attempt. |
| | | */ |
| | | Random random = new Random(); |
| | | int randomTime = random.nextInt(6); // Random from 0 to 5 |
| | | // Wait at least 3 seconds + (0 to 5 seconds) |
| | | long timeout = (long) (3000 + ( randomTime * 1000 ) ); |
| | | boolean noTimeout = replicationServerDomain.tryLock(timeout); |
| | | if (!noTimeout) |
| | | { |
| | | // Timeout |
| | | Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(serverId), |
| | | Short.toString(replicationServer.getServerId())); |
| | | closeSession(message); |
| | | return; |
| | | } |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Thread interrupted, return. |
| | | return; |
| | | } |
| | | } |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | ServerState serverState = replicationServerDomain.getDbServerState(); |
| | | ServerState domServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | | sslEncryption = receivedMsg.getSSLEncryption(); |
| | | sslEncryption = inReplServerStartMsg.getSSLEncryption(); |
| | | |
| | | // Publish our start message |
| | | ReplServerStartMessage outMsg = |
| | | new ReplServerStartMessage(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, serverState, |
| | | protocolVersion, |
| | | localGenerationId, |
| | | sslEncryption); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | | { |
| | | this.baseDn = baseDn; |
| | | } |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | outReplServerStartMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, domServerState, |
| | | protocolVersion, |
| | | localGenerationId, |
| | | sslEncryption, |
| | | replicationServer.getGroupId(), |
| | | replicationServerDomain. |
| | | getReplicationServer().getDegradedStatusThreshold()); |
| | | |
| | | /* Until here session is encrypted then it depends on the negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | session.publish(outReplServerStartMsg); |
| | | } else { |
| | | // We support connection from a V1 RS, send PDU with V1 form |
| | | session.publish(outReplServerStartMsg, |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V1); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = |
| | | replicationServerDomain.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer().getMonitorInstanceName() + |
| | | ", SH received START from RS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | | " localGenerationId=" + localGenerationId + |
| | | " state=" + ss + |
| | | " and sent ReplServerStart with state=" + lss); |
| | | } |
| | | |
| | | // if the remote RS and the local RS have the same genID |
| | | // then it's ok and nothing else to do |
| | | if (generationId == localGenerationId) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + inReplServerStartMsg.toString() + |
| | | "\nAND REPLIED:\n" + outReplServerStartMsg.toString()); |
| | | } |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | if (localGenerationId>0) |
| | | // Did the remote RS answer with the DN we provided him ? |
| | | if (!(this.baseDn.equals(baseDn))) |
| | | { |
| | | // if the local RS is initialized |
| | | if (generationId>0) |
| | | { |
| | | // if the remote RS is initialized |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationServerDomain.getGenerationIdSavedStatus()) |
| | | { |
| | | // it the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | else |
| | | { |
| | | // The present RS has never received changes regarding its |
| | | // gen ID. |
| | | // |
| | | // Example case: |
| | | // - we are in RS1 |
| | | // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) |
| | | // - RS1 has genId1 from LS1 /genId1 comes from data in suffix |
| | | // - we are in RS1 and we receive a START msg from RS2 |
| | | // - Each RS keeps its genID / is degraded and when LS2 will |
| | | // be populated from LS1 everything will becomes ok. |
| | | // |
| | | // Issue: |
| | | // FIXME : Would it be a good idea in some cases to just |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non nul state and |
| | | // we have a nul state ? |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // The remote RS had no genId while the local one has one genID. |
| | | // In our start msg, we have just sent our local genID to |
| | | // the remote RS that will immediatly adopt it |
| | | // So let's store our local genID as the genID of the remote RS. |
| | | // It is necessary to do so, in order to not have a 'bad genID' |
| | | // error when we will try to send updates to the remote RS |
| | | // (before receiving the infoMsg from the remote RS !!!) |
| | | generationId = localGenerationId; |
| | | } |
| | | Message message = ERR_RS_DN_DOES_NOT_MATCH.get( |
| | | this.baseDn.toString(), |
| | | baseDn.toString()); |
| | | closeSession(message); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | else |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE SENT:\n" + outReplServerStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + inReplServerStartMsg.toString()); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | this.serverState = inReplServerStartMsg.getServerState(); |
| | | sendWindowSize = inReplServerStartMsg.getWindowSize(); |
| | | |
| | | // Duplicate server ? |
| | | if (!replicationServerDomain.checkForDuplicateRS(this)) |
| | | { |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | /* Until here session is encrypted then it depends on the |
| | | negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | } else |
| | | { |
| | | // TODO : log error |
| | | return; // we did not recognize the message, ignore it |
| | | // We did not recognize the message, close session as what |
| | | // can happen after is undetermined and we do not want the server to |
| | | // be disturbed |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = replicationServer. |
| | | getReplicationServerDomain(this.baseDn,true); |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { // Only protocol version above V1 has a phase 2 handshake |
| | | |
| | | if (!handshakeOnly) |
| | | { |
| | | boolean started; |
| | | if (serverIsLDAPserver) |
| | | /* |
| | | * NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE: |
| | | * TopologyMsg then TopologyMsg (with a RS) |
| | | * OR |
| | | * StartSessionMsg then TopologyMsg (with a DS) |
| | | */ |
| | | |
| | | TopologyMsg outTopoMsg = null; |
| | | |
| | | if (baseDn != null) // Outgoing connection to a RS |
| | | |
| | | { |
| | | started = replicationServerDomain.startServer(this); |
| | | } |
| | | else |
| | | { |
| | | started = replicationServerDomain.startReplicationServer(this); |
| | | // Send our own TopologyMsg to remote RS |
| | | outTopoMsg = replicationServerDomain.createTopologyMsgForRS(); |
| | | session.publish(outTopoMsg); |
| | | } |
| | | |
| | | if (started) |
| | | // Wait and process TopologyMsg or StartSessionMsg |
| | | log_error_message = false; |
| | | ReplicationMsg msg2 = session.receive(); |
| | | log_error_message = true; |
| | | if (msg2 instanceof TopologyMsg) |
| | | { |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | // Remote RS sent his topo msg |
| | | TopologyMsg inTopoMsg = (TopologyMsg) msg2; |
| | | |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | // CONNECTION WITH A RS |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatThread = new HeartbeatThread( |
| | | "replication Heartbeat to " + serverURL + |
| | | " for " + this.baseDn, |
| | | session, heartbeatInterval/3); |
| | | heartbeatThread.start(); |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | | else |
| | | { |
| | | // the connection is not valid, close it. |
| | | try |
| | | // if the remote RS and the local RS have the same genID |
| | | // then it's ok and nothing else to do |
| | | if (generationId == localGenerationId) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS failed to start locally " + |
| | | " the connection from serverID="+serverId); |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | } |
| | | session.close(); |
| | | } catch (IOException e1) |
| | | } else |
| | | { |
| | | // ignore |
| | | if (localGenerationId > 0) |
| | | { |
| | | // if the local RS is initialized |
| | | if (generationId > 0) |
| | | { |
| | | // if the remote RS is initialized |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationServerDomain.getGenerationIdSavedStatus()) |
| | | { |
| | | // it the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } else |
| | | { |
| | | // The present RS has never received changes regarding its |
| | | // gen ID. |
| | | // |
| | | // Example case: |
| | | // - we are in RS1 |
| | | // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) |
| | | // - RS1 has genId1 from LS1 /genId1 comes from data in |
| | | // suffix |
| | | // - we are in RS1 and we receive a START msg from RS2 |
| | | // - Each RS keeps its genID / is degraded and when LS2 |
| | | // will be populated from LS1 everything will become ok. |
| | | // |
| | | // Issue: |
| | | // FIXME : Would it be a good idea in some cases to just |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non null state and |
| | | // we have a nul state ? |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | // The remote RS has no genId. We don't change anything for the |
| | | // current RS. |
| | | } |
| | | } else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | // WARNING: Must be done before computing topo message to send |
| | | // to peer server as topo message must embed valid generation id |
| | | // for our server |
| | | oldGenerationId = |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | |
| | | if (baseDn == null) // Reply to the RS (incoming connection) |
| | | |
| | | { |
| | | // Send our own TopologyMsg to remote RS |
| | | outTopoMsg = replicationServerDomain.createTopologyMsgForRS(); |
| | | session.publish(outTopoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | } |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE SENT:\n" + outTopoMsg.toString() + |
| | | "\nAND RECEIVED:\n" + inTopoMsg.toString()); |
| | | } |
| | | } |
| | | |
| | | // Alright, connected with new RS (either outgoing or incoming |
| | | // connection): store handler. |
| | | Map<Short, ServerHandler> connectedRSs = |
| | | replicationServerDomain.getConnectedRSs(); |
| | | connectedRSs.put(serverId, this); |
| | | |
| | | // Process TopologyMsg sent by remote RS: store matching new info |
| | | // (this will also warn our connected DSs of the new received info) |
| | | replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false); |
| | | |
| | | } else if (msg2 instanceof StartSessionMsg) |
| | | { |
| | | // CONNECTION WITH A DS |
| | | |
| | | // Process StartSessionMsg sent by remote DS |
| | | StartSessionMsg startSessionMsg = (StartSessionMsg) msg2; |
| | | |
| | | this.status = startSessionMsg.getStatus(); |
| | | // Sanity check: is it a valid initial status? |
| | | if (!isValidInitialStatus(this.status)) |
| | | { |
| | | Message mesg = ERR_RS_INVALID_INIT_STATUS.get( |
| | | this.status.toString(), this.baseDn.toString(), |
| | | Short.toString(serverId)); |
| | | closeSession(mesg); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | this.refUrls = startSessionMsg.getReferralsURLs(); |
| | | this.assuredFlag = startSessionMsg.isAssured(); |
| | | this.assuredMode = startSessionMsg.getAssuredMode(); |
| | | this.safeDataLevel = startSessionMsg.getSafeDataLevel(); |
| | | |
| | | /* |
| | | * If we have already a generationID set for the domain |
| | | * then |
| | | * if the connecting replica has not the same |
| | | * then it is degraded locally and notified by an error message |
| | | * else |
| | | * we set the generationID from the one received |
| | | * (unsaved yet on disk . will be set with the 1rst change |
| | | * received) |
| | | */ |
| | | if (localGenerationId > 0) |
| | | { |
| | | if (generationId != localGenerationId) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } |
| | | } else |
| | | { |
| | | // We are an empty Replicationserver |
| | | if ((generationId > 0) && (!serverState.isEmpty())) |
| | | { |
| | | // If the LDAP server has already sent changes |
| | | // it is not expected to connect to an empty RS |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | logError(message); |
| | | } else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | // WARNING: Must be done before computing topo message to send |
| | | // to peer server as topo message must embed valid generation id |
| | | // for our server |
| | | oldGenerationId = |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | |
| | | // Send our own TopologyMsg to DS |
| | | outTopoMsg = replicationServerDomain.createTopologyMsgForDS( |
| | | this.serverId); |
| | | session.publish(outTopoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + |
| | | "\nSH HANDSHAKE RECEIVED:\n" + startSessionMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | } |
| | | |
| | | // Alright, connected with new DS: store handler. |
| | | Map<Short, ServerHandler> connectedDSs = |
| | | replicationServerDomain.getConnectedDSs(); |
| | | connectedDSs.put(serverId, this); |
| | | |
| | | // Tell peer DSs a new DS just connected to us |
| | | // No need to resend topo msg to this just new DS so not null |
| | | // argument |
| | | replicationServerDomain.sendTopoInfoToDSs(this); |
| | | // Tell peer RSs a new DS just connected to us |
| | | replicationServerDomain.sendTopoInfoToRSs(); |
| | | } else |
| | | { |
| | | // We did not recognize the message, close session as what |
| | | // can happen after is undetermined and we do not want the server to |
| | | // be disturbed |
| | | closeSession(null); |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | return; |
| | | } |
| | | } |
| | | else |
| | | |
| | | /* |
| | | * FINALIZE INITIALIZATION: |
| | | * CREATE READER AND WRITER, HEARTBEAT SYSTEM AND UPDATE MONITORING |
| | | * SYSTEM |
| | | */ |
| | | |
| | | // Disable timeout for next communications |
| | | session.setSoTimeout(0); |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | // For a hanshakeOnly connection, let's only create a reader |
| | | // in order to detect the connection closure. |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader.start(); |
| | | heartbeatThread = new HeartbeatThread( |
| | | "Replication Heartbeat to DS " + serverURL + " " + serverId + |
| | | " for " + this.baseDn + " in RS " + replicationServerId, |
| | | session, heartbeatInterval / 3); |
| | | heartbeatThread.start(); |
| | | } |
| | | |
| | | // Create the status analyzer for the domain if not already started |
| | | if (serverIsLDAPserver) |
| | | { |
| | | if (!replicationServerDomain.isRunningStatusAnalyzer()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " is starting status analyzer"); |
| | | replicationServerDomain.startStatusAnalyzer(); |
| | | } |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } catch (NotSupportedOldVersionPDUException e) |
| | | { |
| | | // We do not need to support DS V1 connection, we just accept RS V1 |
| | | // connection: |
| | | // We just trash the message, log the event for debug purpose and close |
| | | // the connection |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ":" |
| | | + e.getMessage()); |
| | | closeSession(null); |
| | | } catch (Exception e) |
| | | { |
| | | // We do not want polluting error log if error is due to normal session |
| | | // aborted after handshake phase one from a DS that is searching for best |
| | | // suitable RS. |
| | | if ( log_error_message || (baseDn != null) ) |
| | | { |
| | | // some problem happened, reject the connection |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_REPLICATION_SERVER_CONNECTION_ERROR.get( |
| | | this.getMonitorInstanceName())); |
| | | mb.append(": " + stackTraceToSingleLineString(e)); |
| | | closeSession(mb.toMessage()); |
| | | } else |
| | | { |
| | | closeSession(null); |
| | | } |
| | | |
| | | // If generation id of domain was changed, set it back to old value |
| | | // We may have changed it as it was -1 and we received a value >0 from |
| | | // peer server and the last topo message sent may have failed being |
| | | // sent: in that case retrieve old value of generation id for |
| | | // replication server domain |
| | | if (oldGenerationId != -100) |
| | | { |
| | | replicationServerDomain.setGenerationId(oldGenerationId, false); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | |
| | | // Release domain |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | } |
| | | |
| | | /* |
| | | * Close the session logging the passed error message |
| | | * Log nothing if message is null. |
| | | */ |
| | | private void closeSession(Message msg) |
| | | { |
| | | if (msg != null) |
| | | { |
| | | // some problem happened, reject the connection |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get( |
| | | this.getMonitorInstanceName())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | } |
| | | logError(msg); |
| | | } |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException ee) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | |
| | |
| | | * @return true is saturated false if not saturated. |
| | | */ |
| | | public boolean isSaturated(ChangeNumber changeNumber, |
| | | ServerHandler sourceHandler) |
| | | ServerHandler sourceHandler) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendQueue > 0) && |
| | | (size >= sourceHandler.maxSendQueue)) |
| | | (size >= sourceHandler.maxSendQueue)) |
| | | return true; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMessage firstUpdate = msgQueue.first(); |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | |
| | | if (firstUpdate != null) |
| | | { |
| | | long timeDiff = changeNumber.getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendDelay > 0) && |
| | | (timeDiff >= sourceHandler.maxSendDelay)) |
| | | (timeDiff >= sourceHandler.maxSendDelay)) |
| | | return true; |
| | | } |
| | | } |
| | |
| | | if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendQueue > 0) && |
| | | (queueSize >= source.restartSendQueue)) |
| | | (queueSize >= source.restartSendQueue)) |
| | | return false; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMessage firstUpdate = msgQueue.first(); |
| | | UpdateMessage lastUpdate = msgQueue.last(); |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | UpdateMsg lastUpdate = msgQueue.last(); |
| | | |
| | | if ((firstUpdate != null) && (lastUpdate != null)) |
| | | { |
| | | long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendDelay > 0) |
| | | && (timeDiff >= source.restartSendDelay)) |
| | | if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >= |
| | | source.restartSendDelay)) |
| | | return false; |
| | | } |
| | | } |
| | |
| | | */ |
| | | public int getRcvMsgQueueSize() |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of teh receieve queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = replicationServerDomain.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id), |
| | | serverState.getMaxChangeNumber(id)); |
| | | } |
| | | return totalCount; |
| | | } |
| | | } |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of teh receieve queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | ServerState dbState = replicationServerDomain.getDbServerState(); |
| | | return ServerState.diffChanges(dbState, serverState); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return 0; |
| | | |
| | | long currentTime = TimeThread.getTime(); |
| | | return ((currentTime - olderUpdateTime)/1000); |
| | | return ((currentTime - olderUpdateTime) / 1000); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Long getApproxFirstMissingDate() |
| | | { |
| | | Long result = (long)0; |
| | | Long result = (long) 0; |
| | | |
| | | // Get the older CN received |
| | | ChangeNumber olderUpdateCN = getOlderUpdateCN(); |
| | |
| | | { |
| | | // If not present in the local RS db, |
| | | // then approximate with the older update time |
| | | result=olderUpdateCN.getTime(); |
| | | result = olderUpdateCN.getTime(); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | ChangeNumber olderUpdateCN = getOlderUpdateCN(); |
| | | if (olderUpdateCN == null) |
| | | return 0; |
| | | return olderUpdateCN.getTime(); |
| | | return olderUpdateCN.getTime(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (msgQueue.isEmpty()) |
| | | { |
| | | result=null; |
| | | } |
| | | else |
| | | result = null; |
| | | } else |
| | | { |
| | | UpdateMessage msg = msgQueue.first(); |
| | | UpdateMsg msg = msgQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | } |
| | | UpdateMessage msg = iteratorSortedSet.first().getChange(); |
| | | UpdateMsg msg = iteratorSortedSet.first().getChange(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | catch(Exception e) |
| | | } catch (Exception e) |
| | | { |
| | | result=null; |
| | | } |
| | | finally |
| | | result = null; |
| | | } finally |
| | | { |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | UpdateMessage msg = lateQueue.first(); |
| | | UpdateMsg msg = lateQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an update the list of updates that must be sent to the server |
| | | * Add an update to the list of updates that must be sent to the server |
| | | * managed by this ServerHandler. |
| | | * |
| | | * @param update The update that must be added to the list of updates. |
| | | * @param sourceHandler The server that sent the update. |
| | | */ |
| | | public void add(UpdateMessage update, ServerHandler sourceHandler) |
| | | public void add(UpdateMsg update, ServerHandler sourceHandler) |
| | | { |
| | | /* |
| | | * Ignore updates from a server that is degraded due to |
| | | * its inconsistent generationId |
| | | */ |
| | | long referenceGenerationId = replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != generationId)) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_TO.get( |
| | | this.replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName(), |
| | | update.getDn(), |
| | | this.getMonitorInstanceName(), |
| | | Long.toString(generationId), |
| | | Long.toString(referenceGenerationId))); |
| | | |
| | | return; |
| | | } |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | |
| | | * @return the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | */ |
| | | public UpdateMessage take() |
| | | public UpdateMsg take() |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMessage msg = getnextMessage(); |
| | | UpdateMsg msg = getnextMessage(); |
| | | |
| | | /* |
| | | * When we remove a message from the queue we need to check if another |
| | |
| | | try |
| | | { |
| | | replicationServerDomain.checkAllSaturation(); |
| | | } |
| | | catch (IOException e) |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS); |
| | | acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (((interrupted) || (!acquired )) && (!shutdown)); |
| | | } while (((interrupted) || (!acquired)) && (!shutdownWriter)); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | |
| | | * |
| | | * @return The next update that must be sent to the server. |
| | | */ |
| | | private UpdateMessage getnextMessage() |
| | | private UpdateMsg getnextMessage() |
| | | { |
| | | UpdateMessage msg; |
| | | while (active == true) |
| | | UpdateMsg msg; |
| | | while (activeWriter == true) |
| | | { |
| | | if (following == false) |
| | | { |
| | |
| | | if (iterator.getChange() != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | |
| | | setFollowing(true); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | msg = lateQueue.first(); |
| | | synchronized (msgQueue) |
| | |
| | | /* we finally catch up with the regular queue */ |
| | | setFollowing(true); |
| | | lateQueue.clear(); |
| | | UpdateMessage msg1; |
| | | UpdateMsg msg1; |
| | | do |
| | | { |
| | | msg1 = msgQueue.removeFirst(); |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | /* get the next change from the lateQueue */ |
| | | msg = lateQueue.removeFirst(); |
| | |
| | | while (msgQueue.isEmpty()) |
| | | { |
| | | msgQueue.wait(500); |
| | | if (!active) |
| | | if (!activeWriter) |
| | | return null; |
| | | } |
| | | } catch (InterruptedException e) |
| | |
| | | } |
| | | } |
| | | } |
| | | /* |
| | | * Need to loop because following flag may have gone to false between |
| | | * the first check at the beginning of this method |
| | | * and the second check just above. |
| | | */ |
| | | /* |
| | | * Need to loop because following flag may have gone to false between |
| | | * the first check at the beginning of this method |
| | | * and the second check just above. |
| | | */ |
| | | } |
| | | return null; |
| | | } |
| | |
| | | * @param msg the last update sent. |
| | | * @return boolean indicating if the update was meaningful. |
| | | */ |
| | | public boolean updateServerState(UpdateMessage msg) |
| | | public boolean updateServerState(UpdateMsg msg) |
| | | { |
| | | return serverState.update(msg.getChangeNumber()); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Stop this server handler processing. |
| | | */ |
| | | public void stopHandler() |
| | | { |
| | | active = false; |
| | | |
| | | // Stop the remote LSHandler |
| | | for (LightweightServerHandler lsh : connectedServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | connectedServers.clear(); |
| | | |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore. |
| | | } |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | /* wake up the writer thread on an empty queue so that it disappear */ |
| | | msgQueue.clear(); |
| | | msgQueue.notify(); |
| | | msgQueue.notifyAll(); |
| | | } |
| | | |
| | | // Stop the heartbeat thread. |
| | | if (heartbeatThread != null) |
| | | { |
| | | heartbeatThread.shutdown(); |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | } |
| | | |
| | | /** |
| | | * Send the ack to the server that did the original modification. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update that is acked. |
| | |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber) throws IOException |
| | | { |
| | | AckMessage ack = new AckMessage(changeNumber); |
| | | AckMsg ack = new AckMsg(changeNumber); |
| | | session.publish(ack); |
| | | outAckCount++; |
| | | } |
| | |
| | | * @param message The ack message that was received. |
| | | * @param ackingServerId The id of the server that acked the change. |
| | | */ |
| | | public void ack(AckMessage message, short ackingServerId) |
| | | public void ack(AckMsg message, short ackingServerId) |
| | | { |
| | | ChangeNumber changeNumber = message.getChangeNumber(); |
| | | AckMessageList ackList; |
| | |
| | | * @param message the ack message that was received. |
| | | * @param ackingServerId The id of the server that acked the change. |
| | | */ |
| | | public static void ackChangelog(AckMessage message, short ackingServerId) |
| | | public static void ackChangelog(AckMsg message, short ackingServerId) |
| | | { |
| | | ChangeNumber changeNumber = message.getChangeNumber(); |
| | | ReplServerAckMessageList ackList; |
| | |
| | | if (completedFlag) |
| | | { |
| | | ReplicationServerDomain replicationServerDomain = |
| | | ackList.getChangelogCache(); |
| | | ackList.getChangelogCache(); |
| | | replicationServerDomain.sendAck(changeNumber, false, |
| | | ackList.getReplicationServerId()); |
| | | ackList.getReplicationServerId()); |
| | | } |
| | | } |
| | | |
| | |
| | | * @param nbWaitedAck The number of ack that must be received before |
| | | * the update is fully acked. |
| | | */ |
| | | public void addWaitingAck(UpdateMessage update, int nbWaitedAck) |
| | | public void addWaitingAck(UpdateMsg update, int nbWaitedAck) |
| | | { |
| | | AckMessageList ackList = new AckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck); |
| | | synchronized(waitingAcks) |
| | | nbWaitedAck); |
| | | synchronized (waitingAcks) |
| | | { |
| | | waitingAcks.put(update.getChangeNumber(), ackList); |
| | | } |
| | |
| | | * the update is fully acked. |
| | | */ |
| | | public static void addWaitingAck( |
| | | UpdateMessage update, |
| | | short ChangelogServerId, ReplicationServerDomain replicationServerDomain, |
| | | int nbWaitedAck) |
| | | UpdateMsg update, |
| | | short ChangelogServerId, ReplicationServerDomain replicationServerDomain, |
| | | int nbWaitedAck) |
| | | { |
| | | ReplServerAckMessageList ackList = |
| | | new ReplServerAckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck, |
| | | ChangelogServerId, |
| | | replicationServerDomain); |
| | | synchronized(changelogsWaitingAcks) |
| | | new ReplServerAckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck, |
| | | ChangelogServerId, |
| | | replicationServerDomain); |
| | | synchronized (changelogsWaitingAcks) |
| | | { |
| | | changelogsWaitingAcks.put(update.getChangeNumber(), ackList); |
| | | } |
| | |
| | | */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException,InitializationException |
| | | throws ConfigException, InitializationException |
| | | { |
| | | // Nothing to do for now |
| | | } |
| | |
| | | public String getMonitorInstanceName() |
| | | { |
| | | String str = baseDn.toString() + |
| | | " " + serverURL + " " + String.valueOf(serverId); |
| | | " " + serverURL + " " + String.valueOf(serverId); |
| | | |
| | | if (serverIsLDAPserver) |
| | | return "Direct LDAP Server " + str; |
| | | return "Directory Server " + str; |
| | | else |
| | | return "Remote Repl Server " + str; |
| | | return "Remote Replication Server " + str; |
| | | } |
| | | |
| | | /** |
| | |
| | | public void updateMonitorData() |
| | | { |
| | | // As long as getUpdateInterval() returns 0, this will never get called |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | if (serverIsLDAPserver) |
| | | { |
| | | attributes.add(new Attribute("LDAP-Server", serverURL)); |
| | | attributes.add(new Attribute("connected-to", this.replicationServerDomain. |
| | | getReplicationServer().getMonitorInstanceName())); |
| | | attributes.add(Attributes.create("LDAP-Server", serverURL)); |
| | | attributes.add(Attributes.create("connected-to", |
| | | this.replicationServerDomain.getReplicationServer() |
| | | .getMonitorInstanceName())); |
| | | |
| | | } |
| | | else |
| | | { |
| | | attributes.add(new Attribute("ReplicationServer-Server", serverURL)); |
| | | attributes.add(Attributes.create("ReplicationServer-Server", |
| | | serverURL)); |
| | | } |
| | | attributes.add(new Attribute("server-id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", |
| | | baseDn.toString())); |
| | | attributes.add(Attributes.create("server-id", String |
| | | .valueOf(serverId))); |
| | | attributes.add(Attributes.create("base-dn", baseDn.toString())); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | |
| | | |
| | | // Oldest missing update |
| | | Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); |
| | | if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0)) |
| | | if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0)) |
| | | { |
| | | Date date = new Date(approxFirstMissingDate); |
| | | attributes.add(new Attribute("approx-older-change-not-synchronized", |
| | | date.toString())); |
| | | attributes.add( |
| | | new Attribute("approx-older-change-not-synchronized-millis", |
| | | String.valueOf(approxFirstMissingDate))); |
| | | attributes.add(Attributes.create( |
| | | "approx-older-change-not-synchronized", date.toString())); |
| | | attributes.add(Attributes.create( |
| | | "approx-older-change-not-synchronized-millis", String |
| | | .valueOf(approxFirstMissingDate))); |
| | | } |
| | | |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChanges(serverId); |
| | | attributes.add(new Attribute("missing-changes", |
| | | String.valueOf(missingChanges))); |
| | | attributes.add(Attributes.create("missing-changes", String |
| | | .valueOf(missingChanges))); |
| | | |
| | | // Replication delay |
| | | long delay = md.getApproxDelay(serverId); |
| | | attributes.add(new Attribute("approximate-delay", |
| | | String.valueOf(delay))); |
| | | attributes.add(Attributes.create("approximate-delay", String |
| | | .valueOf(delay))); |
| | | } |
| | | catch(Exception e) |
| | | catch (Exception e) |
| | | { |
| | | // TODO: improve the log |
| | | // We failed retrieving the remote monitor data. |
| | | attributes.add(new Attribute("error", |
| | | attributes.add(Attributes.create("error", |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | |
| | | attributes.add( |
| | | new Attribute("queue-size", String.valueOf(msgQueue.count()))); |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.count()))); |
| | | attributes.add( |
| | | new Attribute( |
| | | Attributes.create( |
| | | "queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); |
| | | attributes.add( |
| | | new Attribute( |
| | | Attributes.create( |
| | | "following", String.valueOf(following))); |
| | | |
| | | // Deprecated |
| | | attributes.add(new Attribute("max-waiting-changes", |
| | | String.valueOf(maxQueueSize))); |
| | | attributes.add(new Attribute("update-sent", |
| | | String.valueOf(getOutCount()))); |
| | | attributes.add(new Attribute("update-received", |
| | | String.valueOf(getInCount()))); |
| | | attributes.add(Attributes.create("max-waiting-changes", String |
| | | .valueOf(maxQueueSize))); |
| | | attributes.add(Attributes.create("update-sent", String |
| | | .valueOf(getOutCount()))); |
| | | attributes.add(Attributes.create("update-received", String |
| | | .valueOf(getInCount()))); |
| | | |
| | | // Deprecated as long as assured is not exposed |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | | String.valueOf(getWaitingAckSize()))); |
| | | attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount()))); |
| | | attributes.add(new Attribute("ack-received", |
| | | String.valueOf(getInAckCount()))); |
| | | attributes.add(Attributes.create("update-waiting-acks", String |
| | | .valueOf(getWaitingAckSize()))); |
| | | attributes.add(Attributes.create("ack-sent", String |
| | | .valueOf(getOutAckCount()))); |
| | | attributes.add(Attributes.create("ack-received", String |
| | | .valueOf(getInAckCount()))); |
| | | |
| | | // Window stats |
| | | attributes.add(new Attribute("max-send-window", |
| | | String.valueOf(sendWindowSize))); |
| | | attributes.add(new Attribute("current-send-window", |
| | | String.valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(new Attribute("max-rcv-window", |
| | | String.valueOf(maxRcvWindow))); |
| | | attributes.add(new Attribute("current-rcv-window", |
| | | String.valueOf(rcvWindow))); |
| | | |
| | | /* |
| | | * FIXME:PGB DEPRECATED |
| | | * |
| | | // Missing changes |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | // Age of oldest missing change |
| | | |
| | | // Date of the oldest missing change |
| | | long olderUpdateTime = getOlderUpdateTime(); |
| | | if (olderUpdateTime != 0) |
| | | { |
| | | Date date = new Date(getOlderUpdateTime()); |
| | | attributes.add(new Attribute("older-change-not-synchronized", |
| | | String.valueOf(date.toString()))); |
| | | } |
| | | */ |
| | | attributes.add(Attributes.create("max-send-window", String |
| | | .valueOf(sendWindowSize))); |
| | | attributes.add(Attributes.create("current-send-window", String |
| | | .valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(Attributes.create("max-rcv-window", String |
| | | .valueOf(maxRcvWindow))); |
| | | attributes.add(Attributes.create("current-rcv-window", String |
| | | .valueOf(rcvWindow))); |
| | | |
| | | /* get the Server State */ |
| | | final String ATTR_SERVER_STATE = "server-state"; |
| | | AttributeType type = |
| | | DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | AttributeBuilder builder = new AttributeBuilder("server-state"); |
| | | for (String str : serverState.toStringSet()) |
| | | { |
| | | values.add(new AttributeValue(type,str)); |
| | | builder.add(str); |
| | | } |
| | | Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); |
| | | attributes.add(attr); |
| | | attributes.add(builder.toAttribute()); |
| | | |
| | | // Encryption |
| | | attributes.add(new Attribute("ssl-encryption", |
| | | String.valueOf(session.isEncrypted()))); |
| | | attributes.add(Attributes.create("ssl-encryption", String |
| | | .valueOf(session.isEncrypted()))); |
| | | |
| | | // Data generation |
| | | attributes.add(new Attribute("generation-id", |
| | | String.valueOf(generationId))); |
| | | attributes.add(Attributes.create("generation-id", String |
| | | .valueOf(generationId))); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | shutdown = true; |
| | | /* |
| | | * Shutdown ServerWriter |
| | | */ |
| | | shutdownWriter = true; |
| | | activeWriter = false; |
| | | synchronized (msgQueue) |
| | | { |
| | | /* wake up the writer thread on an empty queue so that it disappear */ |
| | | msgQueue.clear(); |
| | | msgQueue.notify(); |
| | | msgQueue.notifyAll(); |
| | | } |
| | | |
| | | /* |
| | | * Close session to end ServerReader or ServerWriter |
| | | */ |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // Service is closing. |
| | | // ignore. |
| | | } |
| | | |
| | | stopHandler(); |
| | | /* |
| | | * Stop the remote LSHandler |
| | | */ |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | directoryServers.clear(); |
| | | |
| | | /* |
| | | * Stop the heartbeat thread. |
| | | */ |
| | | if (heartbeatThread != null) |
| | | { |
| | | heartbeatThread.shutdown(); |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | |
| | | /* |
| | | * Be sure to wait for ServerWriter and ServerReader death |
| | | * It does not matter if we try to stop a thread which is us (reader |
| | | * or writer), but we must not wait for our own thread death. |
| | | */ |
| | | try |
| | | { |
| | | if (writer != null) { |
| | | if ((writer != null) && (!(Thread.currentThread().equals(writer)))) |
| | | { |
| | | |
| | | writer.join(SHUTDOWN_JOIN_TIMEOUT); |
| | | } |
| | | if (reader != null) { |
| | | if ((reader != null) && (!(Thread.currentThread().equals(reader)))) |
| | | { |
| | | reader.join(SHUTDOWN_JOIN_TIMEOUT); |
| | | } |
| | | } catch (InterruptedException e) |
| | |
| | | |
| | | |
| | | localString += serverId + " " + serverURL + " " + baseDn; |
| | | } |
| | | else |
| | | } else |
| | | localString = "Unknown server"; |
| | | |
| | | return localString; |
| | |
| | | |
| | | /** |
| | | * Decrement the protocol window, then check if it is necessary |
| | | * to send a WindowMessage and send it. |
| | | * to send a WindowMsg and send it. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMessage if necessary. |
| | | * Check the protocol window and send WindowMsg if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | |
| | | } |
| | | if (!flowControl) |
| | | { |
| | | WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); |
| | | WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | |
| | | * @param windowMsg The Window Message containing the information |
| | | * necessary for updating the window size. |
| | | */ |
| | | public void updateWindow(WindowMessage windowMsg) |
| | | public void updateWindow(WindowMsg windowMsg) |
| | | { |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | |
| | | * |
| | | * @param msg The message to be processed. |
| | | */ |
| | | public void process(RoutableMessage msg) |
| | | public void process(RoutableMsg msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " processes received msg=" + msg); |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + ":" + |
| | | "\nprocesses received msg:\n" + msg); |
| | | replicationServerDomain.process(msg, this); |
| | | } |
| | | |
| | | /** |
| | | * Sends the provided ReplServerInfoMessage. |
| | | * Sends the provided TopologyMsg to the peer server. |
| | | * |
| | | * @param info The ReplServerInfoMessage message to be sent. |
| | | * @param topoMsg The TopologyMsg message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void sendInfo(ReplServerInfoMessage info) |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + info); |
| | | public void sendTopoInfo(TopologyMsg topoMsg) |
| | | throws IOException |
| | | { |
| | | // V1 Rs do not support the TopologyMsg |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + ":" + |
| | | "\nsends message:\n" + topoMsg); |
| | | |
| | | session.publish(info); |
| | | } |
| | | session.publish(topoMsg); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * |
| | | * Sets the replication server from the message provided. |
| | | * |
| | | * @param infoMsg The information message. |
| | | */ |
| | | public void receiveReplServerInfo(ReplServerInfoMessage infoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sets replServerInfo " + "<" + infoMsg + ">"); |
| | | /** |
| | | * Stores topology information received from a peer RS and that must be kept |
| | | * in RS handler. |
| | | * |
| | | * @param topoMsg The received topology message |
| | | */ |
| | | public void receiveTopoInfoFromRS(TopologyMsg topoMsg) |
| | | { |
| | | // Store info for remote RS |
| | | List<RSInfo> rsInfos = topoMsg.getRsList(); |
| | | // List should only contain RS info for sender |
| | | RSInfo rsInfo = rsInfos.get(0); |
| | | generationId = rsInfo.getGenerationId(); |
| | | groupId = rsInfo.getGroupId(); |
| | | |
| | | List<String> newRemoteLDAPservers = infoMsg.getConnectedServers(); |
| | | generationId = infoMsg.getGenerationId(); |
| | | /** |
| | | * Store info for DSs connected to the peer RS |
| | | */ |
| | | List<DSInfo> dsInfos = topoMsg.getDsList(); |
| | | |
| | | synchronized(connectedServers) |
| | | { |
| | | // Removes the existing structures |
| | | for (LightweightServerHandler lsh : connectedServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | connectedServers.clear(); |
| | | // Removes the existing structures |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | directoryServers.clear(); |
| | | |
| | | // Creates the new structure according to the message received. |
| | | for (String newConnectedServer : newRemoteLDAPservers) |
| | | { |
| | | LightweightServerHandler lsh |
| | | = new LightweightServerHandler(newConnectedServer, this); |
| | | lsh.startHandler(); |
| | | connectedServers.put(lsh.getServerId(), lsh); |
| | | } |
| | | } |
| | | } |
| | | // Creates the new structure according to the message received. |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | LightweightServerHandler lsh = new LightweightServerHandler(this, |
| | | serverId, dsInfo.getDsId(), dsInfo.getGenerationId(), |
| | | dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(), |
| | | dsInfo.isAssured(), dsInfo.getAssuredMode(), |
| | | dsInfo.getSafeDataLevel()); |
| | | lsh.startHandler(); |
| | | directoryServers.put(lsh.getServerId(), lsh); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * When this handler is connected to a replication server, specifies if |
| | | * a wanted server is connected to this replication server. |
| | | * |
| | | * @param wantedServer The server we want to know if it is connected |
| | | * to the replication server represented by this handler. |
| | | * @return boolean True is the wanted server is connected to the server |
| | | * represented by this handler. |
| | | */ |
| | | public boolean isRemoteLDAPServer(short wantedServer) |
| | | { |
| | | synchronized(connectedServers) |
| | | { |
| | | for (LightweightServerHandler server : connectedServers.values()) |
| | | { |
| | | if (wantedServer == server.getServerId()) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | } |
| | | /** |
| | | * Process message of a remote server changing his status. |
| | | * @param csMsg The message containing the new status |
| | | * @return The new server status of the DS |
| | | */ |
| | | public ServerStatus processNewStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | |
| | | /** |
| | | * When the handler is connected to a replication server, specifies the |
| | | * replication server has remote LDAP servers connected to it. |
| | | * |
| | | * @return boolean True is the replication server has remote LDAP servers |
| | | * connected to it. |
| | | */ |
| | | public boolean hasRemoteLDAPServers() |
| | | { |
| | | return !connectedServers.isEmpty(); |
| | | } |
| | | // Sanity check |
| | | if (!serverIsLDAPserver) |
| | | { |
| | | Message msg = |
| | | ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(baseDn.toString(), |
| | | Short.toString(serverId), csMsg.toString()); |
| | | logError(msg); |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | |
| | | // Get the status the DS just entered |
| | | ServerStatus reqStatus = csMsg.getNewStatus(); |
| | | // Translate new status to a state machine event |
| | | StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_RS_INVALID_NEW_STATUS.get(reqStatus.toString(), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(msg); |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | |
| | | // Check state machine allows this new status |
| | | ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return ServerStatus.INVALID_STATUS; |
| | | } |
| | | |
| | | status = newStatus; |
| | | |
| | | return status; |
| | | } |
| | | |
| | | /** |
| | | * Change the status according to the event generated from the status |
| | | * analyzer. |
| | | * @param event The event to be used for new status computation |
| | | * @return The new status of the DS |
| | | * @throws IOException When raised by the underlying session |
| | | */ |
| | | public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event) |
| | | throws IOException |
| | | { |
| | | // Check state machine allows this new status (Sanity check) |
| | | ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS |
| | | // and vice versa. We may are being trying to change the status while for |
| | | // instance another status has just been entered: e.g a full update has |
| | | // just been engaged. In that case, just ignore attempt to change the |
| | | // status |
| | | return newStatus; |
| | | } |
| | | |
| | | // Send message requesting to change the DS status |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, |
| | | ServerStatus.INVALID_STATUS); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | " Sending change status from status analyzer to " + getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | | |
| | | status = newStatus; |
| | | |
| | | return newStatus; |
| | | } |
| | | |
| | | /** |
| | | * When this handler is connected to a replication server, specifies if |
| | | * a wanted server is connected to this replication server. |
| | | * |
| | | * @param wantedServer The server we want to know if it is connected |
| | | * to the replication server represented by this handler. |
| | | * @return boolean True is the wanted server is connected to the server |
| | | * represented by this handler. |
| | | */ |
| | | public boolean isRemoteLDAPServer(short wantedServer) |
| | | { |
| | | synchronized (directoryServers) |
| | | { |
| | | for (LightweightServerHandler server : directoryServers.values()) |
| | | { |
| | | if (wantedServer == server.getServerId()) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * When the handler is connected to a replication server, specifies the |
| | | * replication server has remote LDAP servers connected to it. |
| | | * |
| | | * @return boolean True is the replication server has remote LDAP servers |
| | | * connected to it. |
| | | */ |
| | | public boolean hasRemoteLDAPServers() |
| | | { |
| | | return !directoryServers.isEmpty(); |
| | | } |
| | | |
| | | /** |
| | | * Send an InitializeRequestMessage to the server connected through this |
| | |
| | | * @param msg The message to be processed |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void send(RoutableMessage msg) throws IOException |
| | | public void send(RoutableMsg msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + msg); |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + ":" + |
| | | "\nsends message:\n" + msg); |
| | | session.publish(msg); |
| | | } |
| | | |
| | | /** |
| | | * Send an ErrorMessage to the peer. |
| | | * Send an ErrorMsg to the peer. |
| | | * |
| | | * @param errorMsg The message to be sent |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void sendError(ErrorMessage errorMsg) throws IOException |
| | | public void sendError(ErrorMsg errorMsg) throws IOException |
| | | { |
| | | session.publish(errorMsg); |
| | | } |
| | | |
| | | /** |
| | | * Process the reception of a WindowProbe message. |
| | | * Process the reception of a WindowProbeMsg message. |
| | | * |
| | | * @param windowProbeMsg The message to process. |
| | | * |
| | | * @throws IOException When the session becomes unavailable. |
| | | */ |
| | | public void process(WindowProbe windowProbeMsg) throws IOException |
| | | public void process(WindowProbeMsg windowProbeMsg) throws IOException |
| | | { |
| | | if (rcvWindow > 0) |
| | | { |
| | |
| | | // lets update the LDAP server with out current window size and hope |
| | | // that everything will work better in the futur. |
| | | // TODO also log an error message. |
| | | WindowMessage msg = new WindowMessage(rcvWindow); |
| | | WindowMsg msg = new WindowMsg(rcvWindow); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | // Both the LDAP server and the replication server believes that the |
| | | // window is closed. Lets check the flowcontrol in case we |
| | |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationId for this domain. |
| | | */ |
| | | public void warnBadGenerationId() |
| | | { |
| | | // Notify the peer that it is now invalid regarding the generationId |
| | | // We are now waiting a startServer message from this server with |
| | | // a valid generationId. |
| | | try |
| | | { |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString()); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(serverId, replicationServerId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // FIXME Log exception when sending reset error message |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sends a message containing a generationId to a peer server. |
| | | * The peer is expected to be a replication server. |
| | | * |
| | |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void forwardGenerationIdToRS(ResetGenerationId msg) |
| | | throws IOException |
| | | public void forwardGenerationIdToRS(ResetGenerationIdMsg msg) |
| | | throws IOException |
| | | { |
| | | session.publish(msg); |
| | | } |
| | |
| | | * Return a Set containing the servers known by this replicationServer. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | */ |
| | | public Set<Short> getConnectedServerIds() |
| | | public Set<Short> getConnectedDirectoryServerIds() |
| | | { |
| | | return connectedServers.keySet(); |
| | | return directoryServers.keySet(); |
| | | } |
| | | |
| | | /** |
| | | * Get the map of connected DSs |
| | | * (to the RS represented by this server handler). |
| | | * @return The map of connected DSs |
| | | */ |
| | | public Map<Short, LightweightServerHandler> getConnectedDSs() |
| | | { |
| | | return directoryServers; |
| | | } |
| | | |
| | | /** |
| | | * Order the peer DS server to change his status or close the connection |
| | | * according to the requested new generation id. |
| | | * @param newGenId The new generation id to take into account |
| | | * @throws IOException If IO error occurred. |
| | | */ |
| | | public void changeStatusForResetGenId(long newGenId) |
| | | throws IOException |
| | | { |
| | | StatusMachineEvent event = null; |
| | | |
| | | if (newGenId == -1) |
| | | { |
| | | // The generation id is being made invalid, let's put the DS |
| | | // into BAD_GEN_ID_STATUS |
| | | event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; |
| | | } else |
| | | { |
| | | if (newGenId == generationId) |
| | | { |
| | | if (status == ServerStatus.BAD_GEN_ID_STATUS) |
| | | { |
| | | // This server has the good new reference generation id. |
| | | // Close connection with him to force his reconnection: DS will |
| | | // reconnect in NORMAL_STATUS or DEGRADED_STATUS. |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | ". Closing connection to DS " + getServerId() + |
| | | " for baseDn " + baseDn + " to force reconnection as new local" + |
| | | " generation id and remote one match and DS is in bad gen id: " + |
| | | newGenId); |
| | | } |
| | | |
| | | // Connection closure must not be done calling RSD.stopHandler() as it |
| | | // would rewait the RSD lock that we already must have entering this |
| | | // method. This would lead to a reentrant lock which we do not want. |
| | | // So simply close the session, this will make the hang up appear |
| | | // after the reader thread that took the RSD lock realeases it. |
| | | try |
| | | { |
| | | if (session != null) |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | | } |
| | | |
| | | // NOT_CONNECTED_STATUS is the last one in RS session life: handler |
| | | // will soon disappear after this method call... |
| | | status = ServerStatus.NOT_CONNECTED_STATUS; |
| | | return; |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | ". DS " + getServerId() + " for baseDn " + baseDn + |
| | | " has already generation id " + newGenId + |
| | | " so no ChangeStatusMsg sent to him."); |
| | | } |
| | | return; |
| | | } |
| | | } else |
| | | { |
| | | // This server has a bad generation id compared to new reference one, |
| | | // let's put it into BAD_GEN_ID_STATUS |
| | | event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; |
| | | } |
| | | } |
| | | |
| | | if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) && |
| | | (status == ServerStatus.FULL_UPDATE_STATUS)) |
| | | { |
| | | // Prevent useless error message (full update status cannot lead to bad |
| | | // gen status) |
| | | Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get( |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | baseDn.toString(), |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(newGenId)); |
| | | logError(message); |
| | | return; |
| | | } |
| | | |
| | | ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Send message requesting to change the DS status |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, |
| | | ServerStatus.INVALID_STATUS); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | " Sending change status for reset gen id to " + getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | | |
| | | status = newStatus; |
| | | } |
| | | |
| | | /** |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | | * @return The previous value of the shut down flag |
| | | */ |
| | | public boolean engageShutdown() |
| | | { |
| | | // Use thread safe boolean |
| | | return shuttingDown.getAndSet(true); |
| | | } |
| | | |
| | | /** |
| | | * Gets the status of the connected DS. |
| | | * @return The status of the connected DS. |
| | | */ |
| | | public ServerStatus getStatus() |
| | | { |
| | | return status; |
| | | } |
| | | |
| | | /** |
| | | * Gets the protocol version used with this remote server. |
| | | * @return The protocol version used with this remote server. |
| | | */ |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | } |