| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The Status analyzer that periodically verifies whether the connected DSs |
| | | * are late. Using an AtomicReference to avoid leaking references to costly |
| | | * threads. |
| | | * Periodically verifies whether the connected DSs are late and publishes any |
| | | * pending status messages. |
| | | */ |
| | | private AtomicReference<StatusAnalyzer> statusAnalyzer = |
| | | new AtomicReference<StatusAnalyzer>(); |
| | | private final StatusAnalyzer statusAnalyzer; |
| | | |
| | | /** |
| | | * The monitoring publisher that periodically sends monitoring messages to the |
| | |
| | | private int assuredTimeoutTimerPurgeCounter = 0; |
| | | |
| | | /** |
| | | * Stores pending status messages such as DS change time heartbeats for future |
| | | * forwarding to the rest of the topology. This class is required in order to |
| | | * decouple inbound IO processing from outbound IO processing and avoid |
| | | * potential inter-process deadlocks. In particular, the {@code ServerReader} |
| | | * thread must not send messages. |
| | | */ |
| | | private static class PendingStatusMessages |
| | | { |
| | | private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats = |
| | | new HashMap<Integer, ChangeTimeHeartbeatMsg>(1); |
| | | private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs = |
| | | new HashMap<Integer, MonitorMsg>(1); |
| | | private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs = |
| | | new HashMap<Integer, MonitorMsg>(1); |
| | | private boolean sendRSTopologyMsg; |
| | | private boolean sendDSTopologyMsg; |
| | | private int excludedDSForTopologyMsg = -1; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Enqueues a TopologyMsg for all the connected directory servers in order |
| | | * to let them know the topology (every known DSs and RSs). |
| | | * |
| | | * @param excludedDS |
| | | * If not null, the topology message will not be sent to this DS. |
| | | */ |
| | | private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS) |
| | | { |
| | | int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1; |
| | | if (sendDSTopologyMsg) |
| | | { |
| | | if (excludedServerId != excludedDSForTopologyMsg) |
| | | { |
| | | excludedDSForTopologyMsg = -1; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | sendDSTopologyMsg = true; |
| | | excludedDSForTopologyMsg = excludedServerId; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Enqueues a TopologyMsg for all the connected replication servers in order |
| | | * to let them know our connected LDAP servers. |
| | | */ |
| | | private void enqueueTopoInfoToAllRSs() |
| | | { |
| | | sendRSTopologyMsg = true; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to |
| | | * all other RS instances. |
| | | * |
| | | * @param msg |
| | | * The heartbeat message. |
| | | */ |
| | | private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg) |
| | | { |
| | | pendingHeartbeats.put(msg.getCSN().getServerId(), msg); |
| | | } |
| | | |
| | | |
| | | |
| | | private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg) |
| | | { |
| | | pendingDSMonitorMsgs.put(dsServerId, msg); |
| | | } |
| | | |
| | | |
| | | |
| | | private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg) |
| | | { |
| | | pendingRSMonitorMsgs.put(rsServerId, msg); |
| | | } |
| | | } |
| | | |
| | | private final Object pendingStatusMessagesLock = new Object(); |
| | | |
| | | /** @GuardedBy("pendingStatusMessagesLock") */ |
| | | private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages(); |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the baseDN. |
| | | * |
| | | * @param baseDN |
| | |
| | | + ") assured timer for domain \"" + baseDN + "\"", true); |
| | | this.domainDB = |
| | | localReplicationServer.getChangelogDB().getReplicationDomainDB(); |
| | | |
| | | this.statusAnalyzer = new StatusAnalyzer(this); |
| | | this.statusAnalyzer.start(); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | | |
| | |
| | | * @param ack The ack message received. |
| | | * @param ackingServer The server handler of the server that sent the ack. |
| | | */ |
| | | public void processAck(AckMsg ack, ServerHandler ackingServer) |
| | | void processAck(AckMsg ack, ServerHandler ackingServer) |
| | | { |
| | | // Retrieve the expected acks info for the update matching the original |
| | | // sent update. |
| | |
| | | if (connectedRSs.containsKey(sHandler.getServerId())) |
| | | { |
| | | unregisterServerHandler(sHandler, shutdown, false); |
| | | } else if (connectedDSs.containsKey(sHandler.getServerId())) |
| | | } |
| | | else if (connectedDSs.containsKey(sHandler.getServerId())) |
| | | { |
| | | // If this is the last DS for the domain, |
| | | // shutdown the status analyzer |
| | | if (connectedDSs.size() == 1) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug("remote server " + sHandler |
| | | + " is the last DS to be stopped: stopping status analyzer"); |
| | | } |
| | | stopStatusAnalyzer(); |
| | | } |
| | | unregisterServerHandler(sHandler, shutdown, true); |
| | | } else if (otherHandlers.contains(sHandler)) |
| | | } |
| | | else if (otherHandlers.contains(sHandler)) |
| | | { |
| | | unregisterOtherHandler(sHandler); |
| | | } |
| | |
| | | resetGenerationIdIfPossible(); |
| | | if (!shutdown) |
| | | { |
| | | if (isDirectoryServer) |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendTopoInfoToAllRSs(); |
| | | if (isDirectoryServer) |
| | | { |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | pendingStatusMessages.enqueueTopoInfoToAllRSs(); |
| | | } |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null); |
| | | } |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | sendTopoInfoToAllDSsExcept(null); |
| | | statusAnalyzer.notifyPendingStatusMessage(); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * @param msg |
| | | * The message received and to be processed. |
| | | * @param msgEmitter |
| | | * The server handler of the server that emitted the message. |
| | | * @param sender |
| | | * The server handler of the server that sent the message. |
| | | */ |
| | | public void process(RoutableMsg msg, ServerHandler msgEmitter) |
| | | void process(RoutableMsg msg, ServerHandler sender) |
| | | { |
| | | // Test the message for which a ReplicationServer is expected |
| | | // to be the destination |
| | | if (!(msg instanceof InitializeRequestMsg) && |
| | | !(msg instanceof InitializeTargetMsg) && |
| | | !(msg instanceof InitializeRcvAckMsg) && |
| | | !(msg instanceof EntryMsg) && |
| | | !(msg instanceof DoneMsg) && |
| | | (msg.getDestination() == this.localReplicationServer.getServerId())) |
| | | if (msg.getDestination() == localReplicationServer.getServerId()) |
| | | { |
| | | // Handle routable messages targeted at this RS. |
| | | if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg) msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | replyWithTopologyMonitorMsg(msg, msgEmitter); |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | domainMonitor.receiveMonitorDataResponse(monitorMsg, |
| | | msgEmitter.getServerId()); |
| | | } else |
| | | { |
| | | replyWithUnroutableMsgType(msgEmitter, msg); |
| | | logError(ERR_ERROR_MSG_RECEIVED.get(((ErrorMsg) msg).getDetails())); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | List<ServerHandler> servers = getDestinationServers(msg, msgEmitter); |
| | | if (!servers.isEmpty()) |
| | | { |
| | | forwardMsgToAllServers(msg, servers, msgEmitter); |
| | | else |
| | | { |
| | | replyWithUnroutableMsgType(sender, msg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | replyWithUnreachablePeerMsg(msgEmitter, msg); |
| | | // Forward message not destined for this RS. |
| | | List<ServerHandler> servers = getDestinationServers(msg, sender); |
| | | if (!servers.isEmpty()) |
| | | { |
| | | forwardMsgToAllServers(msg, servers, sender); |
| | | } |
| | | else |
| | | { |
| | | replyWithUnreachablePeerMsg(sender, msg); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void replyWithTopologyMonitorMsg(RoutableMsg msg, |
| | | ServerHandler msgEmitter) |
| | | { |
| | | /* |
| | | * If the request comes from a Directory Server we need to build the full |
| | | * list of all servers in the topology and send back a MonitorMsg with the |
| | | * full list of all the servers in the topology. |
| | | */ |
| | | if (msgEmitter.isDataServer()) |
| | | { |
| | | // Monitoring information requested by a DS |
| | | try |
| | | { |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), |
| | | domainMonitor.getMonitorData()); |
| | | msgEmitter.send(monitorMsg); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // the connection was closed. |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // Monitoring information requested by a RS |
| | | MonitorMsg monitorMsg = createLocalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID()); |
| | | |
| | | if (monitorMsg != null) |
| | | { |
| | | try |
| | | { |
| | | msgEmitter.send(monitorMsg); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg |
| | | .getDestination()))); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Responds to a monitor request message. |
| | | * |
| | | * @param msg |
| | | * The monitor request message. |
| | | * @param sender |
| | | * The DS/RS which sent the monitor request. |
| | | */ |
| | | void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender) |
| | | { |
| | | enqueueMonitorMsg(msg, sender); |
| | | } |
| | | |
| | | /** |
| | | * Responds to a monitor message. |
| | | * |
| | | * @param msg |
| | | * The monitor message |
| | | * @param sender |
| | | * The DS/RS which sent the monitor. |
| | | */ |
| | | void processMonitorMsg(MonitorMsg msg, ServerHandler sender) |
| | | { |
| | | domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId()); |
| | | } |
| | | |
| | | private void replyWithUnroutableMsgType(ServerHandler msgEmitter, |
| | |
| | | } |
| | | |
| | | private void forwardMsgToAllServers(RoutableMsg msg, |
| | | List<ServerHandler> servers, ServerHandler msgEmitter) |
| | | List<ServerHandler> servers, ServerHandler sender) |
| | | { |
| | | for (ServerHandler targetHandler : servers) |
| | | { |
| | |
| | | ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message); |
| | | try |
| | | { |
| | | msgEmitter.send(errMsg); |
| | | sender.send(errMsg); |
| | | } catch (IOException ioe1) |
| | | { |
| | | // an error happened on the sender session trying to recover |
| | | // from an error on the receiver session. |
| | | // We don't have much solution left beside closing the sessions. |
| | | stopServer(msgEmitter, false); |
| | | stopServer(sender, false); |
| | | stopServer(targetHandler, false); |
| | | } |
| | | // TODO Handle error properly (sender timeout in addition) |
| | |
| | | * @return The newly created and filled MonitorMsg. Null if the current thread |
| | | * was interrupted while attempting to get the domain lock. |
| | | */ |
| | | public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination) |
| | | private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination) |
| | | { |
| | | try |
| | | final MonitorMsg monitorMsg = new MonitorMsg(sender, destination); |
| | | monitorMsg.setReplServerDbState(getLatestServerState()); |
| | | |
| | | // Add the server state for each connected DS and RS. |
| | | for (DataServerHandler dsHandler : this.connectedDSs.values()) |
| | | { |
| | | // Lock domain as we need to go through connected servers list |
| | | lock(); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | return null; |
| | | monitorMsg.setServerState(dsHandler.getServerId(), |
| | | dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(), |
| | | true); |
| | | } |
| | | |
| | | try |
| | | for (ReplicationServerHandler rsHandler : this.connectedRSs.values()) |
| | | { |
| | | final MonitorMsg monitorMsg = new MonitorMsg(sender, destination); |
| | | monitorMsg.setReplServerDbState(getLatestServerState()); |
| | | |
| | | // Add the server state for each connected DS and RS. |
| | | for (DataServerHandler dsHandler : this.connectedDSs.values()) |
| | | { |
| | | monitorMsg.setServerState(dsHandler.getServerId(), dsHandler |
| | | .getServerState(), dsHandler.getApproxFirstMissingDate(), true); |
| | | } |
| | | |
| | | for (ReplicationServerHandler rsHandler : this.connectedRSs.values()) |
| | | { |
| | | monitorMsg.setServerState(rsHandler.getServerId(), rsHandler |
| | | .getServerState(), rsHandler.getApproxFirstMissingDate(), false); |
| | | } |
| | | |
| | | return monitorMsg; |
| | | monitorMsg.setServerState(rsHandler.getServerId(), |
| | | rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(), |
| | | false); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | return monitorMsg; |
| | | } |
| | | |
| | | /** |
| | |
| | | assuredTimeoutTimer.cancel(); |
| | | |
| | | stopAllServers(true); |
| | | statusAnalyzer.shutdown(); |
| | | } |
| | | |
| | | /** |
| | |
| | | return "ReplicationServerDomain " + baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Send a TopologyMsg to all the connected directory servers in order to let |
| | | * them know the topology (every known DSs and RSs). |
| | | * |
| | | * @param notThisOne |
| | | * If not null, the topology message will not be sent to this DS. |
| | | */ |
| | | private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne) |
| | | { |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | if (dsHandler != notThisOne) |
| | | // All except the supplied one |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | | if (!dsHandler.shuttingDown() |
| | | && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | TopologyMsg topoMsg = |
| | | createTopologyMsgForDS(dsHandler.getServerId()); |
| | | try |
| | | { |
| | | dsHandler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i == 2) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDN.toNormalizedString(), "directory", |
| | | Integer.toString(dsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | sleep(100); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Send a TopologyMsg to all the connected replication servers |
| | | * in order to let them know our connected LDAP servers. |
| | | */ |
| | | private void sendTopoInfoToAllRSs() |
| | | { |
| | | TopologyMsg topoMsg = createTopologyMsgForRS(); |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | | if (!rsHandler.shuttingDown() |
| | | && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | try |
| | | { |
| | | rsHandler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i == 2) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDN.toNormalizedString(), "replication", |
| | | Integer.toString(rsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | sleep(100); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a TopologyMsg filled with information to be sent to a remote RS. |
| | |
| | | return; |
| | | } |
| | | |
| | | sendTopoInfoToAllExcept(senderHandler); |
| | | enqueueTopoInfoToAllExcept(senderHandler); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | senderHandler.getServerId(), baseDN.toNormalizedString(), |
| | |
| | | * @param event The event to be used for new status computation |
| | | * @return True if we have been interrupted (must stop), false otherwise |
| | | */ |
| | | public boolean changeStatus(DataServerHandler dsHandler, |
| | | private boolean changeStatus(DataServerHandler dsHandler, |
| | | StatusMachineEvent event) |
| | | { |
| | | try |
| | |
| | | return false; |
| | | } |
| | | |
| | | sendTopoInfoToAllExcept(dsHandler); |
| | | enqueueTopoInfoToAllExcept(dsHandler); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | */ |
| | | public void sendTopoInfoToAll() |
| | | { |
| | | sendTopoInfoToAllExcept(null); |
| | | enqueueTopoInfoToAllExcept(null); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param dsHandler |
| | | * if not null, the topology message will not be sent to this DS |
| | | */ |
| | | private void sendTopoInfoToAllExcept(DataServerHandler dsHandler) |
| | | private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler) |
| | | { |
| | | sendTopoInfoToAllDSsExcept(dsHandler); |
| | | sendTopoInfoToAllRSs(); |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler); |
| | | pendingStatusMessages.enqueueTopoInfoToAllRSs(); |
| | | } |
| | | statusAnalyzer.notifyPendingStatusMessage(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * Sends the currently known topology information to every connected |
| | | * DS we have. |
| | | */ |
| | | sendTopoInfoToAllDSsExcept(null); |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null); |
| | | } |
| | | statusAnalyzer.notifyPendingStatusMessage(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Starts the status analyzer for the domain if not already started. |
| | | */ |
| | | private void startStatusAnalyzer() |
| | | { |
| | | int degradedStatusThreshold = |
| | | localReplicationServer.getDegradedStatusThreshold(); |
| | | if (degradedStatusThreshold > 0) // 0 means no status analyzer |
| | | { |
| | | final StatusAnalyzer thread = |
| | | new StatusAnalyzer(this, degradedStatusThreshold); |
| | | if (statusAnalyzer.compareAndSet(null, thread)) |
| | | { |
| | | thread.start(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stops the status analyzer for the domain. |
| | | */ |
| | | private void stopStatusAnalyzer() |
| | | { |
| | | final StatusAnalyzer thread = statusAnalyzer.get(); |
| | | if (thread != null && statusAnalyzer.compareAndSet(thread, null)) |
| | | { |
| | | thread.shutdown(); |
| | | thread.waitForShutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Starts the monitoring publisher for the domain if not already started. |
| | | */ |
| | | private void startMonitoringPublisher() |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | private void sendTopologyMsg(String type, ServerHandler handler, |
| | | TopologyMsg msg) |
| | | { |
| | | for (int i = 1; i <= 2; i++) |
| | | { |
| | | if (!handler.shuttingDown() |
| | | && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | try |
| | | { |
| | | handler.sendTopoInfo(msg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i == 2) |
| | | { |
| | | logError(ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDN.toNormalizedString(), type, |
| | | String.valueOf(handler.getServerId()), e.getMessage())); |
| | | } |
| | | } |
| | | } |
| | | sleep(100); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp) |
| | | * value received, and forwarding the message to the other RSes. |
| | | * @param senderHandler The handler for the server that sent the heartbeat. |
| | | * @param msg The message to process. |
| | | */ |
| | | public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, |
| | | void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, |
| | | ChangeTimeHeartbeatMsg msg) |
| | | { |
| | | try |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } |
| | | catch (InterruptedException ex) |
| | | { |
| | | // We can't deal with this here, so re-interrupt thread so that it is |
| | | // caught during subsequent IO. |
| | | Thread.currentThread().interrupt(); |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | if (senderHandler.isDataServer()) |
| | | /* |
| | | * If we are the first replication server warned, then forward the message |
| | | * to the remote replication servers. |
| | | */ |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | // If we are the first replication server warned, |
| | | // then forwards the message to the remote replication servers |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | try |
| | | { |
| | | if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3) |
| | | { |
| | | rsHandler.send(msg); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG |
| | | .get("Replication Server " |
| | | + localReplicationServer.getReplicationPort() + " " |
| | | + baseDN + " " + localReplicationServer.getServerId())); |
| | | stopServer(rsHandler, false); |
| | | } |
| | | } |
| | | pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | statusAnalyzer.notifyPendingStatusMessage(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the latest (more recent) trim date of the changelog dbs associated |
| | | * to this domain. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Update the status analyzer with the new threshold value. |
| | | * |
| | | * @param degradedStatusThreshold |
| | | * The new threshold value. |
| | | */ |
| | | void updateDegradedStatusThreshold(int degradedStatusThreshold) |
| | | { |
| | | if (degradedStatusThreshold == 0) |
| | | { |
| | | // Requested to stop analyzers |
| | | stopStatusAnalyzer(); |
| | | return; |
| | | } |
| | | |
| | | final StatusAnalyzer saThread = statusAnalyzer.get(); |
| | | if (saThread != null) // it is running |
| | | { |
| | | saThread.setDegradedStatusThreshold(degradedStatusThreshold); |
| | | } |
| | | else if (connectedDSs.size() > 0) |
| | | { |
| | | // Requested to start analyzers with provided threshold value |
| | | startStatusAnalyzer(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the monitoring publisher with the new period value. |
| | | * |
| | | * @param period |
| | |
| | | */ |
| | | public void register(DataServerHandler dsHandler) |
| | | { |
| | | startStatusAnalyzer(); |
| | | startMonitoringPublisher(); |
| | | |
| | | // connected with new DS: store handler. |
| | |
| | | |
| | | // Tell peer RSs and DSs a new DS just connected to us |
| | | // No need to re-send TopologyMsg to this just new DS |
| | | sendTopoInfoToAllExcept(dsHandler); |
| | | enqueueTopoInfoToAllExcept(dsHandler); |
| | | } |
| | | |
| | | /** |
| | |
| | | + " and port=" + localReplicationServer.getReplicationPort() |
| | | + ": " + message); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Go through each connected DS, get the number of pending changes we have for |
| | | * it and change status accordingly if threshold value is crossed/uncrossed. |
| | | */ |
| | | void checkDSDegradedStatus() |
| | | { |
| | | final int degradedStatusThreshold = localReplicationServer |
| | | .getDegradedStatusThreshold(); |
| | | // Threshold value = 0 means no status analyzer (no degrading system) |
| | | // we should not have that as the status analyzer thread should not be |
| | | // created if this is the case, but for sanity purpose, we add this |
| | | // test |
| | | if (degradedStatusThreshold > 0) |
| | | { |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | { |
| | | // Get number of pending changes for this server |
| | | final int nChanges = serverHandler.getRcvMsgQueueSize(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + getLocalRSServerId() + ", for baseDN=" |
| | | + getBaseDN() + ": " + "Status analyzer: DS " |
| | | + serverHandler.getServerId() + " has " + nChanges |
| | | + " message(s) in writer queue."); |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // the current status does allow it, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer() method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | if (serverHandler.getStatus() == NORMAL_STATUS |
| | | && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT)) |
| | | { |
| | | break; // Interrupted. |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (serverHandler.getStatus() == DEGRADED_STATUS |
| | | && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT)) |
| | | { |
| | | break; // Interrupted. |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Sends any enqueued status messages to the rest of the topology. |
| | | */ |
| | | void sendPendingStatusMessages() |
| | | { |
| | | /* |
| | | * Take a snapshot of pending status notifications in order to avoid holding |
| | | * the broadcast lock for too long. In addition, clear the notifications so |
| | | * that they are not resent the next time. |
| | | */ |
| | | final PendingStatusMessages savedState; |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | savedState = pendingStatusMessages; |
| | | pendingStatusMessages = new PendingStatusMessages(); |
| | | } |
| | | sendPendingChangeTimeHeartbeatMsgs(savedState); |
| | | sendPendingTopologyMsgs(savedState); |
| | | sendPendingMonitorMsgs(savedState); |
| | | } |
| | | |
| | | |
| | | |
| | | private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs) |
| | | { |
| | | for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs |
| | | .entrySet()) |
| | | { |
| | | ServerHandler ds = connectedDSs.get(msg.getKey()); |
| | | if (ds != null) |
| | | { |
| | | try |
| | | { |
| | | ds.send(msg.getValue()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Ignore: connection closed. |
| | | } |
| | | } |
| | | } |
| | | for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs |
| | | .entrySet()) |
| | | { |
| | | ServerHandler rs = connectedRSs.get(msg.getKey()); |
| | | if (rs != null) |
| | | { |
| | | try |
| | | { |
| | | rs.send(msg.getValue()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | |
| | | // FIXME: why do we log for RSs but not DSs? |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(String.valueOf(msg |
| | | .getValue().getDestination()))); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs) |
| | | { |
| | | for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats |
| | | .values()) |
| | | { |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | try |
| | | { |
| | | if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3) |
| | | { |
| | | rsHandler.send(pendingHeartbeat); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get("Replication Server " |
| | | + localReplicationServer.getReplicationPort() + " " + baseDN |
| | | + " " + localReplicationServer.getServerId())); |
| | | stopServer(rsHandler, false); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs) |
| | | { |
| | | if (pendingMsgs.sendDSTopologyMsg) |
| | | { |
| | | for (ServerHandler handler : connectedDSs.values()) |
| | | { |
| | | if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg) |
| | | { |
| | | final TopologyMsg topoMsg = createTopologyMsgForDS(handler |
| | | .getServerId()); |
| | | sendTopologyMsg("directory", handler, topoMsg); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (pendingMsgs.sendRSTopologyMsg) |
| | | { |
| | | final TopologyMsg topoMsg = createTopologyMsgForRS(); |
| | | for (ServerHandler handler : connectedRSs.values()) |
| | | { |
| | | sendTopologyMsg("replication", handler, topoMsg); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender) |
| | | { |
| | | /* |
| | | * If the request comes from a Directory Server we need to build the full |
| | | * list of all servers in the topology and send back a MonitorMsg with the |
| | | * full list of all the servers in the topology. |
| | | */ |
| | | if (sender.isDataServer()) |
| | | { |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), |
| | | domainMonitor.getMonitorData()); |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(), |
| | | monitorMsg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | MonitorMsg monitorMsg = createLocalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID()); |
| | | synchronized (pendingStatusMessagesLock) |
| | | { |
| | | pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(), |
| | | monitorMsg); |
| | | } |
| | | } |
| | | statusAnalyzer.notifyPendingStatusMessage(); |
| | | } |
| | | } |