| | |
| | | |
| | | import java.io.IOException; |
| | | import java.net.InetAddress; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; |
| | | import org.opends.server.replication.protocol.DoneMsg; |
| | | import org.opends.server.replication.protocol.EntryMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.InitializeRequestMsg; |
| | | import org.opends.server.replication.protocol.InitializeTargetMsg; |
| | | import org.opends.server.replication.protocol.InitializeRcvAckMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import org.opends.server.replication.server. |
| | | ReplicationServer.GlobalServerId; |
| | | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | |
| | | /** |
| | | * The monitor data consolidated over the topology. |
| | | */ |
| | | private MonitorData monitorData = new MonitorData(); |
| | | private MonitorData wrkMonitorData; |
| | | private final Object monitorDataLock = new Object(); |
| | | private volatile MonitorData monitorData = new MonitorData(); |
| | | |
| | | // This lock guards against multiple concurrent monitor data recalculation. |
| | | private final Object pendingMonitorLock = new Object(); |
| | | |
| | | // Guarded by pendingMonitorLock. |
| | | private long monitorDataLastBuildDate = 0; |
| | | |
| | | // The set of replication servers which are already known to be slow to send |
| | | // monitor data. |
| | | // |
| | | // Guarded by pendingMonitorLock. |
| | | private Set<Integer> monitorDataLateServers = new HashSet<Integer>(); |
| | | |
| | | // This lock serializes updates to the pending monitor data. |
| | | private final Object pendingMonitorDataLock = new Object(); |
| | | |
| | | // Monitor data which is currently being calculated. |
| | | // |
| | | // Guarded by pendingMonitorDataLock. |
| | | private MonitorData pendingMonitorData; |
| | | |
| | | // A set containing the IDs of servers from which we are currently expecting |
| | | // monitor responses. When a response is received from a server we remove the |
| | | // ID from this table, and count down the latch if the ID was in the table. |
| | | // |
| | | // Guarded by pendingMonitorDataLock. |
| | | private final Set<Integer> pendingMonitorDataServerIDs = |
| | | new HashSet<Integer>(); |
| | | |
| | | // This latch is non-null and is used in order to count incoming responses as |
| | | // they arrive. Since incoming response may arrive at any time, even when |
| | | // there is no pending monitor request, access to the latch must be guarded. |
| | | // |
| | | // Guarded by pendingMonitorDataLock. |
| | | private CountDownLatch pendingMonitorDataLatch = null; |
| | | |
| | | // TODO: Remote monitor data cache lifetime is 500ms/should be configurable |
| | | private final long monitorDataLifeTime = 500; |
| | | |
| | | |
| | | |
| | | /** |
| | | * The needed info for each received assured update message we are waiting |
| | |
| | | // every n number of treated assured messages |
| | | private int assuredTimeoutTimerPurgeCounter = 0; |
| | | |
| | | ServerState ctHeartbeatState = null; |
| | | private ServerState ctHeartbeatState = null; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | |
| | | * domain. |
| | | * @param handler the provided handler to unregister. |
| | | */ |
| | | protected void unregisterServerHandler(ServerHandler handler) |
| | | private void unregisterServerHandler(ServerHandler handler) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | |
| | | * - traverse replicationServers list and test for each if DS are connected |
| | | * So it strongly relies on the directoryServers list |
| | | */ |
| | | protected void mayResetGenerationId() |
| | | private void mayResetGenerationId() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | * @param senderHandler The handler of the server that published this message. |
| | | * @return The list of destination handlers. |
| | | */ |
| | | protected List<ServerHandler> getDestinationServers(RoutableMsg msg, |
| | | private List<ServerHandler> getDestinationServers(RoutableMsg msg, |
| | | ServerHandler senderHandler) |
| | | { |
| | | List<ServerHandler> servers = |
| | |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | // Monitoring information requested by a DS |
| | | MonitorMsg monitorMsg = |
| | | createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), false); |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), monitorData); |
| | | |
| | | if (monitorMsg != null) |
| | | if (monitorMsg != null) |
| | | { |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | | } catch (IOException e) |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // the connection was closed. |
| | | } |
| | |
| | | } |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg monitorMsg = |
| | | (MonitorMsg) msg; |
| | | |
| | | GlobalServerId globalServerId = |
| | | new GlobalServerId(baseDn, senderHandler.getServerId()); |
| | | receivesMonitorDataResponse(monitorMsg, globalServerId); |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId()); |
| | | } else |
| | | { |
| | | logError(NOTE_ERR_ROUTING_TO_SERVER.get( |
| | |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Creates a new monitor message including monitoring information for the |
| | | * whole topology. |
| | | * @param sender The sender of this message. |
| | | * @param destination The destination of this message. |
| | | * @param updateMonitorData A boolean indicating if the monitor data should |
| | | * be updated. If false the last monitoring data |
| | | * that was computed will be returned. This is |
| | | * acceptable for most cases because the monitoring |
| | | * thread computes the monitoring data frequently. |
| | | * If true is used the calling thread may be |
| | | * blocked for a while. |
| | | * |
| | | * @param sender |
| | | * The sender of this message. |
| | | * @param destination |
| | | * The destination of this message. |
| | | * @param monitorData |
| | | * The domain monitor data which should be used for the message. |
| | | * @return The newly created and filled MonitorMsg. Null if a problem occurred |
| | | * during message creation. |
| | | * during message creation. |
| | | */ |
| | | public MonitorMsg createGlobalTopologyMonitorMsg( |
| | | int sender, int destination, boolean updateMonitorData) |
| | | int sender, int destination, MonitorData monitorData) |
| | | { |
| | | MonitorMsg returnMsg = |
| | | new MonitorMsg(sender, destination); |
| | | |
| | | try |
| | | { |
| | | returnMsg.setReplServerDbState(getDbServerState()); |
| | | // Update the information we have about all servers |
| | | // in the topology. |
| | | MonitorData md = computeMonitorData(updateMonitorData); |
| | | returnMsg.setReplServerDbState(getDbServerState()); |
| | | |
| | | // Add the informations about the Replicas currently in |
| | | // the topology. |
| | | Iterator<Integer> it = md.ldapIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState( |
| | | replicaId, md.getLDAPServerState(replicaId), |
| | | md.getApproxFirstMissingDate(replicaId), true); |
| | | } |
| | | |
| | | // Add the informations about the Replication Servers |
| | | // currently in the topology. |
| | | it = md.rsIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState( |
| | | replicaId, md.getRSStates(replicaId), |
| | | md.getRSApproxFirstMissingDate(replicaId), false); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | // Add the informations about the Replicas currently in |
| | | // the topology. |
| | | Iterator<Integer> it = monitorData.ldapIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | // If we can't compute the Monitor Information, send |
| | | // back an empty message. |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState(replicaId, |
| | | monitorData.getLDAPServerState(replicaId), |
| | | monitorData.getApproxFirstMissingDate(replicaId), true); |
| | | } |
| | | |
| | | // Add the informations about the Replication Servers |
| | | // currently in the topology. |
| | | it = monitorData.rsIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState(replicaId, |
| | | monitorData.getRSStates(replicaId), |
| | | monitorData.getRSApproxFirstMissingDate(replicaId), false); |
| | | } |
| | | |
| | | return returnMsg; |
| | | } |
| | | |
| | |
| | | * Monitor Data generation |
| | | * ======================= |
| | | */ |
| | | /** |
| | | * Retrieves the global monitor data. |
| | | * @param updateMonitorData A boolean indicating if the monitor data should |
| | | * be updated. If false the last monitoring data |
| | | * that was computed will be returned. This is |
| | | * acceptable for most cases because the monitoring |
| | | * thread computes the monitoring data frequently. |
| | | * If true is used the calling thread may be |
| | | * blocked for a while. |
| | | * @return The monitor data. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | protected MonitorData computeMonitorData(boolean updateMonitorData) |
| | | throws DirectoryException |
| | | { |
| | | synchronized (monitoringLock) |
| | | { |
| | | if (updateMonitorData) |
| | | { |
| | | // Update the monitorData of ALL domains if this was necessary. |
| | | replicationServer.computeMonitorData(); |
| | | } |
| | | |
| | | // Returns the monitorData of THIS domain |
| | | return monitorData; |
| | | } |
| | | /** |
| | | * Returns the latest monitor data available for this replication server |
| | | * domain. |
| | | * |
| | | * @return The latest monitor data available for this replication server |
| | | * domain, which is never {@code null}. |
| | | */ |
| | | MonitorData getDomainMonitorData() |
| | | { |
| | | return monitorData; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Recomputes the monitor data for this replication server domain. |
| | | * |
| | | * @return The recomputed monitor data for this replication server domain. |
| | | * @throws InterruptedException |
| | | * If this thread is interrupted while waiting for a response. |
| | | */ |
| | | MonitorData computeDomainMonitorData() throws InterruptedException |
| | | { |
| | | // Only allow monitor recalculation at a time. |
| | | synchronized (pendingMonitorLock) |
| | | { |
| | | if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread |
| | | .getTime()) |
| | | { |
| | | try |
| | | { |
| | | // Prevent out of band monitor responses from updating our pending |
| | | // table until we are ready. |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Clear the pending monitor data. |
| | | pendingMonitorDataServerIDs.clear(); |
| | | pendingMonitorData = new MonitorData(); |
| | | |
| | | // Initialize the monitor data. |
| | | initializePendingMonitorData(); |
| | | |
| | | // Send the monitor requests to the connected replication servers. |
| | | for (ReplicationServerHandler rs : replicationServers.values()) |
| | | { |
| | | // Add server ID to pending table. |
| | | int serverId = rs.getServerId(); |
| | | |
| | | MonitorRequestMsg msg = new MonitorRequestMsg( |
| | | this.replicationServer.getServerId(), serverId); |
| | | try |
| | | { |
| | | rs.send(msg); |
| | | |
| | | // Only register this server ID if we were able to send the |
| | | // message. |
| | | pendingMonitorDataServerIDs.add(serverId); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Log a message and do a best effort from here. |
| | | Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST |
| | | .get(baseDn, serverId, e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | // Create the pending response latch based on the number of expected |
| | | // monitor responses. |
| | | pendingMonitorDataLatch = new CountDownLatch( |
| | | pendingMonitorDataServerIDs.size()); |
| | | } |
| | | |
| | | // Wait for the responses to come back. |
| | | pendingMonitorDataLatch.await(5, TimeUnit.SECONDS); |
| | | |
| | | // Log messages for replication servers that have gone or come back. |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Log servers that have come back. |
| | | for (int serverId : monitorDataLateServers) |
| | | { |
| | | // Ensure that we only log once per server: don't fill the |
| | | // error log with repeated messages. |
| | | if (!pendingMonitorDataServerIDs.contains(serverId)) |
| | | { |
| | | logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn, |
| | | serverId)); |
| | | } |
| | | } |
| | | |
| | | // Log servers that have gone away. |
| | | for (int serverId : pendingMonitorDataServerIDs) |
| | | { |
| | | // Ensure that we only log once per server: don't fill the |
| | | // error log with repeated messages. |
| | | if (!monitorDataLateServers.contains(serverId)) |
| | | { |
| | | logError(ERR_MISSING_REMOTE_MONITOR_DATA.get(baseDn, |
| | | serverId)); |
| | | } |
| | | } |
| | | |
| | | // Remember which servers were late this time. |
| | | monitorDataLateServers.clear(); |
| | | monitorDataLateServers.addAll(pendingMonitorDataServerIDs); |
| | | } |
| | | |
| | | // Store the new computed data as the reference |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Now we have the expected answers or an error occurred |
| | | pendingMonitorData.completeComputing(); |
| | | monitorData = pendingMonitorData; |
| | | monitorDataLastBuildDate = TimeThread.getTime(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Clear pending state. |
| | | pendingMonitorData = null; |
| | | pendingMonitorDataLatch = null; |
| | | pendingMonitorDataServerIDs.clear(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return monitorData; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Start collecting global monitoring information for this |
| | | * ReplicationServerDomain. |
| | | * |
| | | * @param expectedMonitoringMsg The list of server handler we have to wait a |
| | | * monitoring message from. Will be filled as necessary by this method. |
| | | * @throws DirectoryException |
| | | * In case the monitoring information could not be collected. |
| | | */ |
| | | |
| | | private void initializePendingMonitorData() |
| | | { |
| | | // Let's process our directly connected LSes |
| | | // - in the ServerHandler for a given LS1, the stored state contains : |
| | | // - the max CN produced by LS1 |
| | | // - the last CN consumed by LS1 from LS2..n |
| | | // - in the RSdomain/dbHandler, the built-in state contains : |
| | | // - the max CN produced by each server |
| | | // So for a given LS connected we can take the state and the max from |
| | | // the LS/state. |
| | | |
| | | for (ServerHandler directlsh : directoryServers.values()) |
| | | { |
| | | int serverID = directlsh.getServerId(); |
| | | |
| | | // the state comes from the state stored in the SH |
| | | ServerState directlshState = directlsh.getServerState() |
| | | .duplicate(); |
| | | |
| | | // the max CN sent by that LS also comes from the SH |
| | | ChangeNumber maxcn = directlshState |
| | | .getMaxChangeNumber(serverID); |
| | | if (maxcn == null) |
| | | { |
| | | // This directly connected LS has never produced any change |
| | | maxcn = new ChangeNumber(0, 0, serverID); |
| | | } |
| | | pendingMonitorData.setMaxCN(serverID, maxcn); |
| | | pendingMonitorData.setLDAPServerState(serverID, directlshState); |
| | | pendingMonitorData.setFirstMissingDate(serverID, |
| | | directlsh.getApproxFirstMissingDate()); |
| | | } |
| | | |
| | | // Then initialize the max CN for the LS that produced something |
| | | // - from our own local db state |
| | | // - whatever they are directly or indirectly connected |
| | | ServerState dbServerState = getDbServerState(); |
| | | pendingMonitorData.setRSState(replicationServer.getServerId(), |
| | | dbServerState); |
| | | Iterator<Integer> it = dbServerState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int sid = it.next(); |
| | | ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid); |
| | | pendingMonitorData.setMaxCN(sid, storedCN); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Processes a Monitor message receives from a remote Replication Server and |
| | | * stores the data received. |
| | | * |
| | | * @throws DirectoryException In case the monitoring information could |
| | | * not be collected. |
| | | */ |
| | | |
| | | void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg) |
| | | throws DirectoryException |
| | | { |
| | | synchronized (monitorDataLock) |
| | | { |
| | | wrkMonitorData = new MonitorData(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Computing monitor data "); |
| | | |
| | | // Let's process our directly connected LSes |
| | | // - in the ServerHandler for a given LS1, the stored state contains : |
| | | // - the max CN produced by LS1 |
| | | // - the last CN consumed by LS1 from LS2..n |
| | | // - in the RSdomain/dbHandler, the built-in state contains : |
| | | // - the max CN produced by each server |
| | | // So for a given LS connected we can take the state and the max from |
| | | // the LS/state. |
| | | |
| | | for (ServerHandler directlsh : directoryServers.values()) |
| | | { |
| | | int serverID = directlsh.getServerId(); |
| | | |
| | | // the state comes from the state stored in the SH |
| | | ServerState directlshState = directlsh.getServerState().duplicate(); |
| | | |
| | | // the max CN sent by that LS also comes from the SH |
| | | ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID); |
| | | if (maxcn == null) |
| | | { |
| | | // This directly connected LS has never produced any change |
| | | maxcn = new ChangeNumber(0, 0, serverID); |
| | | } |
| | | wrkMonitorData.setMaxCN(serverID, maxcn); |
| | | wrkMonitorData.setLDAPServerState(serverID, directlshState); |
| | | wrkMonitorData.setFirstMissingDate(serverID, |
| | | directlsh.getApproxFirstMissingDate()); |
| | | } |
| | | |
| | | // Then initialize the max CN for the LS that produced something |
| | | // - from our own local db state |
| | | // - whatever they are directly or indirectly connected |
| | | ServerState dbServerState = getDbServerState(); |
| | | wrkMonitorData.setRSState(replicationServer.getServerId(), dbServerState); |
| | | Iterator<Integer> it = dbServerState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int sid = it.next(); |
| | | ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid); |
| | | wrkMonitorData.setMaxCN(sid, storedCN); |
| | | } |
| | | |
| | | // Now we have used all available local informations |
| | | // and we need the remote ones. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Local monitor data: " + |
| | | wrkMonitorData.toString()); |
| | | } |
| | | |
| | | // Send the request for remote monitor data to the |
| | | sendMonitorDataRequest(expectedMonitoringMsg); |
| | | } |
| | | |
| | | /** |
| | | * Complete all the calculation when all monitoring information |
| | | * has been received. |
| | | */ |
| | | void completeMonitorData() |
| | | { |
| | | // Store the new computed data as the reference |
| | | synchronized (monitorDataLock) |
| | | { |
| | | // Now we have the expected answers or an error occurred |
| | | wrkMonitorData.completeComputing(); |
| | | monitorData = wrkMonitorData; |
| | | wrkMonitorData = null; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " *** Computed MonitorData: " + |
| | | monitorData.toString()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sends a MonitorRequest message to all connected RS. |
| | | * @param expectedMonitoringMsg The list of server handler we have to wait a |
| | | * monitoring message from. Will be filled as necessary by this method. |
| | | * @throws DirectoryException when a problem occurs. |
| | | */ |
| | | protected void sendMonitorDataRequest( |
| | | List<GlobalServerId> expectedMonitoringMsg) |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | for (ServerHandler rs : replicationServers.values()) |
| | | { |
| | | int serverId = rs.getServerId(); |
| | | // Store the fact that we expect a MonitoringMsg back from this server |
| | | expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId)); |
| | | MonitorRequestMsg msg = |
| | | new MonitorRequestMsg(this.replicationServer.getServerId(), |
| | | serverId); |
| | | rs.send(msg); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get(); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message, e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes a Monitor message receives from a remote Replication Server |
| | | * and stores the data received. |
| | | * |
| | | * @param msg The message to be processed. |
| | | * @param globalServerHandlerId server handler that is receiving the message. |
| | | * @param msg |
| | | * The message to be processed. |
| | | * @param globalServerHandlerId |
| | | * server handler that is receiving the message. |
| | | */ |
| | | private void receivesMonitorDataResponse(MonitorMsg msg, |
| | | GlobalServerId globalServerId) |
| | | int serverId) |
| | | { |
| | | try |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | synchronized (monitorDataLock) |
| | | if (pendingMonitorData == null) |
| | | { |
| | | if (wrkMonitorData == null) |
| | | { |
| | | // This is a response for an earlier request whose computing is |
| | | // already complete. |
| | | logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get( |
| | | Integer.toString(msg.getSenderID()))); |
| | | return; |
| | | } |
| | | // This is a response for an earlier request whose computing is |
| | | // already complete. |
| | | logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn, |
| | | msg.getSenderID())); |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN across the RSes |
| | | ServerState replServerState = msg.getReplServerDbState(); |
| | | wrkMonitorData.setMaxCNs(replServerState); |
| | | pendingMonitorData.setMaxCNs(replServerState); |
| | | |
| | | // store the remote RS states. |
| | | wrkMonitorData.setRSState(msg.getSenderID(), replServerState); |
| | | pendingMonitorData.setRSState(msg.getSenderID(), |
| | | replServerState); |
| | | |
| | | // Store the remote LDAP servers states |
| | | Iterator<Integer> lsidIterator = msg.ldapIterator(); |
| | |
| | | { |
| | | int sid = lsidIterator.next(); |
| | | ServerState dsServerState = msg.getLDAPServerState(sid); |
| | | wrkMonitorData.setMaxCNs(dsServerState); |
| | | wrkMonitorData.setLDAPServerState(sid, dsServerState); |
| | | wrkMonitorData.setFirstMissingDate(sid, |
| | | msg.getLDAPApproxFirstMissingDate(sid)); |
| | | pendingMonitorData.setMaxCNs(dsServerState); |
| | | pendingMonitorData.setLDAPServerState(sid, dsServerState); |
| | | pendingMonitorData.setFirstMissingDate(sid, |
| | | msg.getLDAPApproxFirstMissingDate(sid)); |
| | | } |
| | | |
| | | // Process the latency reported by the remote RSi on its connections |
| | |
| | | { |
| | | // this is the latency of the remote RSi regarding the current RS |
| | | // let's update the fmd of my connected LS |
| | | for (ServerHandler connectedlsh : directoryServers.values()) |
| | | for (ServerHandler connectedlsh : directoryServers |
| | | .values()) |
| | | { |
| | | int connectedlsid = connectedlsh.getServerId(); |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd); |
| | | pendingMonitorData.setFirstMissingDate(connectedlsid, |
| | | newfmd); |
| | | } |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | // this is the latency of the remote RSi regarding another RSj |
| | | // let's update the latency of the LSes connected to RSj |
| | | ReplicationServerHandler rsjHdr = replicationServers.get(rsid); |
| | | ReplicationServerHandler rsjHdr = replicationServers |
| | | .get(rsid); |
| | | if (rsjHdr != null) |
| | | { |
| | | for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds()) |
| | | for (int remotelsid : rsjHdr |
| | | .getConnectedDirectoryServerIds()) |
| | | { |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | wrkMonitorData.setFirstMissingDate(remotelsid, newfmd); |
| | | pendingMonitorData.setFirstMissingDate(remotelsid, |
| | | newfmd); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | if (debugEnabled()) |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // FIXME: do we really expect these??? |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e |
| | | .getMessage() + stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requestor thread. |
| | | if (pendingMonitorDataServerIDs.remove(serverId)) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this + |
| | | " baseDn=" + baseDn + |
| | | " Processed msg from " + msg.getSenderID() + |
| | | " New monitor data: " + wrkMonitorData.toString()); |
| | | pendingMonitorDataLatch.countDown(); |
| | | } |
| | | } |
| | | |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requestor thread. |
| | | replicationServer.responseReceived(globalServerId); |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() + |
| | | stackTraceToSingleLineString(e))); |
| | | |
| | | // If an exception occurs while processing one of the expected message, |
| | | // the processing is aborted and the waiting thread is awoke. |
| | | replicationServer.responseReceivedAll(); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | return replicationServers; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * A synchronization mechanism is created to insure exclusive access to the |
| | | * domain. The goal is to have a consistent view of the topology by locking |
| | |
| | | private ReentrantLock lock = new ReentrantLock(); |
| | | |
| | | /** |
| | | * This lock is used to protect the monitoring computing. |
| | | */ |
| | | private final Object monitoringLock = new Object(); |
| | | |
| | | /** |
| | | * This lock is used to protect the generationid variable. |
| | | */ |
| | | private final Object generationIDLock = new Object(); |
| | |
| | | builder.add(baseDn.toString() + " " + generationId); |
| | | attributes.add(builder.toAttribute()); |
| | | |
| | | try |
| | | { |
| | | MonitorData md = computeMonitorData(true); |
| | | MonitorData md = getDomainMonitorData(); |
| | | |
| | | // Missing changes |
| | | long missingChanges = |
| | | md.getMissingChangesRS(replicationServer.getServerId()); |
| | | attributes.add(Attributes.create("missing-changes", String.valueOf( |
| | | missingChanges))); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e)); |
| | | // We failed retrieving the monitor data. |
| | | attributes.add(Attributes.create("error", message.toString())); |
| | | } |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChangesRS(replicationServer |
| | | .getServerId()); |
| | | attributes.add(Attributes.create("missing-changes", |
| | | String.valueOf(missingChanges))); |
| | | |
| | | return attributes; |
| | | } |