| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ReplicationIterator; |
| | | import org.opends.server.replication.server.changelog.je.DbHandler; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | */ |
| | | private AtomicReference<MonitoringPublisher> monitoringPublisher = |
| | | new AtomicReference<MonitoringPublisher>(); |
| | | /** |
| | | * Maintains monitor data for the current domain. |
| | | */ |
| | | private ReplicationDomainMonitor domainMonitor = |
| | | new ReplicationDomainMonitor(this); |
| | | |
| | | /** |
| | | * The following map contains one balanced tree for each replica ID to which |
| | |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // Monitor data management |
| | | /** |
| | | * The monitor data consolidated over the topology. |
| | | */ |
| | | private volatile MonitorData monitorData = new MonitorData(); |
| | | |
| | | /** |
| | | * This lock guards against multiple concurrent monitor data recalculation. |
| | | */ |
| | | private final Object pendingMonitorLock = new Object(); |
| | | |
| | | /** Guarded by pendingMonitorLock. */ |
| | | private long monitorDataLastBuildDate = 0; |
| | | |
| | | /** |
| | | * The set of replication servers which are already known to be slow to send |
| | | * monitor data. |
| | | * <p> |
| | | * Guarded by pendingMonitorLock. |
| | | */ |
| | | private final Set<Integer> monitorDataLateServers = new HashSet<Integer>(); |
| | | |
| | | /** This lock serializes updates to the pending monitor data. */ |
| | | private final Object pendingMonitorDataLock = new Object(); |
| | | |
| | | /** |
| | | * Monitor data which is currently being calculated. Guarded by |
| | | * pendingMonitorDataLock. |
| | | */ |
| | | private MonitorData pendingMonitorData; |
| | | |
| | | /** |
| | | * A set containing the IDs of servers from which we are currently expecting |
| | | * monitor responses. When a response is received from a server we remove the |
| | | * ID from this table, and count down the latch if the ID was in the table. |
| | | * <p> |
| | | * Guarded by pendingMonitorDataLock. |
| | | */ |
| | | private final Set<Integer> pendingMonitorDataServerIDs = |
| | | new HashSet<Integer>(); |
| | | |
| | | /** |
| | | * This latch is non-null and is used in order to count incoming responses as |
| | | * they arrive. Since incoming response may arrive at any time, even when |
| | | * there is no pending monitor request, access to the latch must be guarded. |
| | | * <p> |
| | | * Guarded by pendingMonitorDataLock. |
| | | */ |
| | | private CountDownLatch pendingMonitorDataLatch = null; |
| | | |
| | | /** |
| | | * TODO: Remote monitor data cache lifetime is 500ms/should be configurable. |
| | | */ |
| | | private final long monitorDataLifeTime = 500; |
| | | |
| | | |
| | | |
| | | /** |
| | | * The needed info for each received assured update message we are waiting |
| | | * acks for. |
| | |
| | | |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.incrementInCount(); |
| | | |
| | | if (generationId < 0) |
| | | { |
| | | generationId = sourceHandler.getGenerationId(); |
| | | } |
| | | setGenerationIdIfUnset(sourceHandler.getGenerationId()); |
| | | |
| | | /** |
| | | * If this is an assured message (a message requesting ack), we must |
| | |
| | | { |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | { |
| | | TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update " |
| | | + update.getChangeNumber() |
| | | TRACER.debugInfo(getMessage("update " + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + dsHandler.getServerId() + " with generation id " |
| | | + dsHandler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | + "generation id " + generationId)); |
| | | } |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | TRACER.debugInfo("In RS " + localReplicationServer.getServerId() |
| | | + " for dn " + baseDn + ", update " + update.getChangeNumber() |
| | | TRACER.debugInfo(getMessage("update " + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + dsHandler.getServerId() + " as it is in full update"); |
| | | + dsHandler.getServerId() + " as it is in full update")); |
| | | } |
| | | } |
| | | |
| | |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + localReplicationServer.getServerId() |
| | | + " for "+ baseDn |
| | | + ", sending timeout for assured update with change " |
| | | + " number " + cn + " to server id " |
| | | + origServer.getServerId()); |
| | | TRACER.debugInfo(getMessage( |
| | | "sending timeout for assured update with change number " + cn |
| | | + " to server id " + origServer.getServerId())); |
| | | } |
| | | try |
| | | { |
| | |
| | | unregisterServerHandler(sHandler); |
| | | sHandler.shutdown(); |
| | | |
| | | // Check if generation id has to be reset |
| | | mayResetGenerationId(); |
| | | resetGenerationIdIfPossible(); |
| | | if (!shutdown) |
| | | { |
| | | if (isDirectoryServer) |
| | |
| | | * - traverse replicationServers list and test for each if DS are connected |
| | | * So it strongly relies on the directoryServers list |
| | | */ |
| | | private void mayResetGenerationId() |
| | | private void resetGenerationIdIfPossible() |
| | | { |
| | | String prefix = |
| | | "In RS " + this.localReplicationServer.getMonitorInstanceName() |
| | | + " for " + baseDn + " "; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus=" |
| | | + generationIdSavedStatus); |
| | | TRACER.debugInfo(getMessage( |
| | | "mayResetGenerationId generationIdSavedStatus=" |
| | | + generationIdSavedStatus)); |
| | | } |
| | | |
| | | // If there is no more any LDAP server connected to this domain in the |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(prefix + "mayResetGenerationId skip RS " |
| | | TRACER.debugInfo(getMessage("mayResetGenerationId skip RS " |
| | | + rsHandler.getMonitorInstanceName() |
| | | + " that has different genId"); |
| | | + " that has different genId")); |
| | | } |
| | | } |
| | | else if (rsHandler.hasRemoteLDAPServers()) |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(prefix + "mayResetGenerationId RS " |
| | | TRACER.debugInfo(getMessage("mayResetGenerationId RS " |
| | | + rsHandler.getMonitorInstanceName() |
| | | + " has ldap servers connected to it" |
| | | + " - will not reset generationId"); |
| | | + " - will not reset generationId")); |
| | | } |
| | | break; |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(prefix + "has ldap servers connected to it" |
| | | + " - will not reset generationId"); |
| | | TRACER.debugInfo(getMessage("has ldap servers connected to it" |
| | | + " - will not reset generationId")); |
| | | } |
| | | } |
| | | |
| | | if (!ldapServersConnectedInTheTopology |
| | | && !this.generationIdSavedStatus |
| | | && !generationIdSavedStatus |
| | | && generationId != -1) |
| | | { |
| | | changeGenerationId(-1, false); |
| | |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId()); |
| | | domainMonitor.receiveMonitorDataResponse(monitorMsg, |
| | | msgEmitter.getServerId()); |
| | | } else |
| | | { |
| | | replyWithUnroutableMsgType(msgEmitter, msg); |
| | |
| | | if (msgEmitter.isDataServer()) |
| | | { |
| | | // Monitoring information requested by a DS |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), monitorData); |
| | | try |
| | | { |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID()); |
| | | msgEmitter.send(monitorMsg); |
| | | } |
| | | catch (IOException e) |
| | |
| | | * The sender of this message. |
| | | * @param destination |
| | | * The destination of this message. |
| | | * @param monitorData |
| | | * The domain monitor data which should be used for the message. |
| | | * @return The newly created and filled MonitorMsg. Null if a problem occurred |
| | | * during message creation. |
| | | */ |
| | | public MonitorMsg createGlobalTopologyMonitorMsg( |
| | | int sender, int destination, MonitorData monitorData) |
| | | public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination) |
| | | { |
| | | final MonitorMsg returnMsg = new MonitorMsg(sender, destination); |
| | | |
| | | returnMsg.setReplServerDbState(getDbServerState()); |
| | | |
| | | // Add the informations about the Replicas currently in the topology. |
| | | Iterator<Integer> it = monitorData.ldapIterator(); |
| | | while (it.hasNext()) |
| | | // Add the server state for each DS and RS currently in the topology. |
| | | final MonitorData monitorData = getDomainMonitorData(); |
| | | for (int replicaId : toIterable(monitorData.ldapIterator())) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState(replicaId, |
| | | monitorData.getLDAPServerState(replicaId), |
| | | monitorData.getApproxFirstMissingDate(replicaId), true); |
| | | } |
| | | |
| | | // Add the information about the RSs currently in the topology. |
| | | it = monitorData.rsIterator(); |
| | | while (it.hasNext()) |
| | | for (int replicaId : toIterable(monitorData.rsIterator())) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState(replicaId, |
| | | monitorData.getRSStates(replicaId), |
| | | monitorData.getRSApproxFirstMissingDate(replicaId), false); |
| | |
| | | |
| | | try |
| | | { |
| | | MonitorMsg monitorMsg = new MonitorMsg(sender, destination); |
| | | final MonitorMsg monitorMsg = new MonitorMsg(sender, destination); |
| | | monitorMsg.setReplServerDbState(getDbServerState()); |
| | | |
| | | // Populate for each connected LDAP Server |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | // Add the server state for each connected DS and RS. |
| | | for (DataServerHandler dsHandler : this.connectedDSs.values()) |
| | | { |
| | | monitorMsg.setServerState(dsHandler.getServerId(), dsHandler |
| | | .getServerState(), dsHandler.getApproxFirstMissingDate(), true); |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ReplicationServerHandler rsHandler : this.connectedRSs.values()) |
| | | { |
| | | monitorMsg.setServerState(rsHandler.getServerId(), rsHandler |
| | | .getServerState(), rsHandler.getApproxFirstMissingDate(), false); |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerDbState(getDbServerState()); |
| | | return monitorMsg; |
| | | } |
| | | finally |
| | |
| | | |
| | | if (this.generationId != generationId) |
| | | { |
| | | // we are changing of genId |
| | | clearDbs(); |
| | | |
| | | this.generationId = generationId; |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + getLocalRSServerId() + |
| | | " Receiving ChangeStatusMsg from " + senderHandler.getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | TRACER.debugInfo(getMessage("receiving ChangeStatusMsg from " |
| | | + senderHandler.getServerId() + ":\n" + csMsg)); |
| | | } |
| | | |
| | | try |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + getLocalRSServerId() |
| | | + " Receiving TopologyMsg from " + rsHandler.getServerId() |
| | | + " for baseDn " + baseDn + ":\n" + topoMsg); |
| | | TRACER.debugInfo(getMessage("receiving TopologyMsg from " |
| | | + rsHandler.getServerId() + ":\n" + topoMsg)); |
| | | } |
| | | |
| | | try |
| | |
| | | // Handle generation id |
| | | if (allowResetGenId) |
| | | { |
| | | // Check if generation id has to be reset |
| | | mayResetGenerationId(); |
| | | if (generationId < 0) |
| | | { |
| | | generationId = rsHandler.getGenerationId(); |
| | | } |
| | | resetGenerationIdIfPossible(); |
| | | setGenerationIdIfUnset(rsHandler.getGenerationId()); |
| | | } |
| | | |
| | | if (isDifferentGenerationId(rsHandler.getGenerationId())) |
| | |
| | | } |
| | | } |
| | | |
| | | /* ======================= |
| | | * Monitor Data generation |
| | | * ======================= |
| | | */ |
| | | private void setGenerationIdIfUnset(long generationId) |
| | | { |
| | | if (this.generationId < 0) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the latest monitor data available for this replication server |
| | |
| | | */ |
| | | MonitorData getDomainMonitorData() |
| | | { |
| | | return monitorData; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Recomputes the monitor data for this replication server domain. |
| | | * |
| | | * @return The recomputed monitor data for this replication server domain. |
| | | * @throws InterruptedException |
| | | * If this thread is interrupted while waiting for a response. |
| | | */ |
| | | MonitorData computeDomainMonitorData() throws InterruptedException |
| | | { |
| | | // Only allow monitor recalculation at a time. |
| | | synchronized (pendingMonitorLock) |
| | | { |
| | | if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread |
| | | .getTime()) |
| | | { |
| | | try |
| | | { |
| | | // Prevent out of band monitor responses from updating our pending |
| | | // table until we are ready. |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Clear the pending monitor data. |
| | | pendingMonitorDataServerIDs.clear(); |
| | | pendingMonitorData = new MonitorData(); |
| | | |
| | | // Initialize the monitor data. |
| | | initializePendingMonitorData(); |
| | | |
| | | // Send the monitor requests to the connected replication servers. |
| | | for (ReplicationServerHandler rs : connectedRSs.values()) |
| | | { |
| | | // Add server ID to pending table. |
| | | int serverId = rs.getServerId(); |
| | | |
| | | MonitorRequestMsg msg = new MonitorRequestMsg( |
| | | this.localReplicationServer.getServerId(), serverId); |
| | | try |
| | | { |
| | | rs.send(msg); |
| | | |
| | | // Only register this server ID if we were able to send the |
| | | // message. |
| | | pendingMonitorDataServerIDs.add(serverId); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Log a message and do a best effort from here. |
| | | Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST |
| | | .get(baseDn, serverId, e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | // Create the pending response latch based on the number of expected |
| | | // monitor responses. |
| | | pendingMonitorDataLatch = new CountDownLatch( |
| | | pendingMonitorDataServerIDs.size()); |
| | | } |
| | | |
| | | // Wait for the responses to come back. |
| | | pendingMonitorDataLatch.await(5, TimeUnit.SECONDS); |
| | | |
| | | // Log messages for replication servers that have gone or come back. |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Log servers that have come back. |
| | | for (int serverId : monitorDataLateServers) |
| | | { |
| | | // Ensure that we only log once per server: don't fill the |
| | | // error log with repeated messages. |
| | | if (!pendingMonitorDataServerIDs.contains(serverId)) |
| | | { |
| | | logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn, |
| | | serverId)); |
| | | } |
| | | } |
| | | |
| | | // Log servers that have gone away. |
| | | for (int serverId : pendingMonitorDataServerIDs) |
| | | { |
| | | // Ensure that we only log once per server: don't fill the |
| | | // error log with repeated messages. |
| | | if (!monitorDataLateServers.contains(serverId)) |
| | | { |
| | | logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(baseDn, |
| | | serverId)); |
| | | } |
| | | } |
| | | |
| | | // Remember which servers were late this time. |
| | | monitorDataLateServers.clear(); |
| | | monitorDataLateServers.addAll(pendingMonitorDataServerIDs); |
| | | } |
| | | |
| | | // Store the new computed data as the reference |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Now we have the expected answers or an error occurred |
| | | pendingMonitorData.completeComputing(); |
| | | monitorData = pendingMonitorData; |
| | | monitorDataLastBuildDate = TimeThread.getTime(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | // Clear pending state. |
| | | pendingMonitorData = null; |
| | | pendingMonitorDataLatch = null; |
| | | pendingMonitorDataServerIDs.clear(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return monitorData; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Start collecting global monitoring information for this |
| | | * ReplicationServerDomain. |
| | | */ |
| | | |
| | | private void initializePendingMonitorData() |
| | | { |
| | | // Let's process our directly connected DS |
| | | // - in the ServerHandler for a given DS1, the stored state contains : |
| | | // - the max CN produced by DS1 |
| | | // - the last CN consumed by DS1 from DS2..n |
| | | // - in the RSdomain/dbHandler, the built-in state contains : |
| | | // - the max CN produced by each server |
| | | // So for a given DS connected we can take the state and the max from |
| | | // the DS/state. |
| | | |
| | | for (ServerHandler ds : connectedDSs.values()) |
| | | { |
| | | int serverID = ds.getServerId(); |
| | | |
| | | // the state comes from the state stored in the SH |
| | | ServerState dsState = ds.getServerState().duplicate(); |
| | | |
| | | // the max CN sent by that LS also comes from the SH |
| | | ChangeNumber maxcn = dsState.getChangeNumber(serverID); |
| | | if (maxcn == null) |
| | | { |
| | | // This directly connected LS has never produced any change |
| | | maxcn = new ChangeNumber(0, 0, serverID); |
| | | } |
| | | pendingMonitorData.setMaxCN(serverID, maxcn); |
| | | pendingMonitorData.setLDAPServerState(serverID, dsState); |
| | | pendingMonitorData.setFirstMissingDate(serverID, |
| | | ds.getApproxFirstMissingDate()); |
| | | } |
| | | |
| | | // Then initialize the max CN for the LS that produced something |
| | | // - from our own local db state |
| | | // - whatever they are directly or indirectly connected |
| | | ServerState dbServerState = getDbServerState(); |
| | | pendingMonitorData.setRSState(localReplicationServer.getServerId(), |
| | | dbServerState); |
| | | for (int serverId : dbServerState) { |
| | | ChangeNumber storedCN = dbServerState.getChangeNumber(serverId); |
| | | pendingMonitorData.setMaxCN(serverId, storedCN); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Processes a Monitor message receives from a remote Replication Server and |
| | | * stores the data received. |
| | | * |
| | | * @param msg |
| | | * The message to be processed. |
| | | * @param serverId |
| | | * server handler that is receiving the message. |
| | | */ |
| | | private void receivesMonitorDataResponse(MonitorMsg msg, int serverId) |
| | | { |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | | if (pendingMonitorData == null) |
| | | { |
| | | // This is a response for an earlier request whose computing is |
| | | // already complete. |
| | | logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn, |
| | | msg.getSenderID())); |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN across the RSes |
| | | ServerState replServerState = msg.getReplServerDbState(); |
| | | pendingMonitorData.setMaxCNs(replServerState); |
| | | |
| | | // store the remote RS states. |
| | | pendingMonitorData.setRSState(msg.getSenderID(), replServerState); |
| | | |
| | | // Store the remote LDAP servers states |
| | | Iterator<Integer> dsServerIdIterator = msg.ldapIterator(); |
| | | while (dsServerIdIterator.hasNext()) |
| | | { |
| | | int dsServerId = dsServerIdIterator.next(); |
| | | ServerState dsServerState = msg.getLDAPServerState(dsServerId); |
| | | pendingMonitorData.setMaxCNs(dsServerState); |
| | | pendingMonitorData.setLDAPServerState(dsServerId, dsServerState); |
| | | pendingMonitorData.setFirstMissingDate(dsServerId, |
| | | msg.getLDAPApproxFirstMissingDate(dsServerId)); |
| | | } |
| | | |
| | | // Process the latency reported by the remote RSi on its connections |
| | | // to the other RSes |
| | | Iterator<Integer> rsServerIdIterator = msg.rsIterator(); |
| | | while (rsServerIdIterator.hasNext()) |
| | | { |
| | | int rsServerId = rsServerIdIterator.next(); |
| | | long newFmd = msg.getRSApproxFirstMissingDate(rsServerId); |
| | | if (rsServerId == localReplicationServer.getServerId()) |
| | | { |
| | | // this is the latency of the remote RSi regarding the current RS |
| | | // let's update the fmd of my connected LS |
| | | for (DataServerHandler connectedDS : connectedDSs.values()) |
| | | { |
| | | int connectedServerId = connectedDS.getServerId(); |
| | | pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // this is the latency of the remote RSi regarding another RSj |
| | | // let's update the latency of the LSes connected to RSj |
| | | ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId); |
| | | if (rsjHdr != null) |
| | | { |
| | | for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds()) |
| | | { |
| | | pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // FIXME: do we really expect these??? |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get( |
| | | e.getMessage() + stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requester thread. |
| | | if (pendingMonitorDataServerIDs.remove(serverId)) |
| | | { |
| | | pendingMonitorDataLatch.countDown(); |
| | | } |
| | | } |
| | | } |
| | | return domainMonitor.getMonitorData(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Map<Integer, DataServerHandler> getConnectedDSs() |
| | | { |
| | | return connectedDSs; |
| | | return Collections.unmodifiableMap(connectedDSs); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Map<Integer, ReplicationServerHandler> getConnectedRSs() |
| | | { |
| | | return connectedRSs; |
| | | return Collections.unmodifiableMap(connectedRSs); |
| | | } |
| | | |
| | | |
| | |
| | | String.valueOf(localReplicationServer.getServerId()))); |
| | | attributes.add(Attributes.create("replication-server-port", |
| | | String.valueOf(localReplicationServer.getReplicationPort()))); |
| | | |
| | | // Add all the base DNs that are known by this replication server. |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | | |
| | | // Publish to monitor the generation ID by replicationServerDomain |
| | | attributes.add(Attributes.create("generation-id", |
| | | baseDn + " " + generationId)); |
| | | |
| | | MonitorData md = getDomainMonitorData(); |
| | | |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChangesRS(localReplicationServer |
| | | .getServerId()); |
| | | long missingChanges = getDomainMonitorData().getMissingChangesRS( |
| | | localReplicationServer.getServerId()); |
| | | attributes.add(Attributes.create("missing-changes", |
| | | String.valueOf(missingChanges))); |
| | | |
| | |
| | | // connection): store handler. |
| | | connectedRSs.put(rsHandler.getServerId(), rsHandler); |
| | | } |
| | | |
| | | private String getMessage(String message) |
| | | { |
| | | return "In RS " + localReplicationServer.getServerId() + " for " + baseDn |
| | | + ": " + message; |
| | | } |
| | | } |