| | |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import org.opends.server.replication.server. |
| | | ReplicationServer.GlobalServerId; |
| | | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | |
| | | // late or not |
| | | private StatusAnalyzer statusAnalyzer = null; |
| | | |
| | | // The monitoring publisher that periodically sends monitoring messages to the |
| | | // topology |
| | | private MonitoringPublisher monitoringPublisher = null; |
| | | |
| | | /* |
| | | * The following map contains one balanced tree for each replica ID |
| | | * to which we are currently publishing |
| | |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | // Stop useless monitoring publisher if no more RS or DS in domain |
| | | if ( (directoryServers.size() + replicationServers.size() )== 1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + " is " + |
| | | "the last RS/DS to be stopped: stopping monitoring publisher"); |
| | | stopMonitoringPublisher(); |
| | | } |
| | | |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | if (replicationServers.containsValue(handler)) |
| | |
| | | buildAndSendTopoInfoToDSs(null); |
| | | } |
| | | } |
| | | } else |
| | | } else if (directoryServers.containsValue(handler)) |
| | | { |
| | | if (directoryServers.containsValue(handler)) |
| | | // If this is the last DS for the domain, |
| | | // shutdown the status analyzer |
| | | if (directoryServers.size() == 1) |
| | | { |
| | | // If this is the last DS for the domain, |
| | | // shutdown the status analyzer |
| | | if (directoryServers.size() == 1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + |
| | | " is the last DS to be stopped: stopping status analyzer"); |
| | | stopStatusAnalyzer(); |
| | | } |
| | | stopStatusAnalyzer(); |
| | | } |
| | | |
| | | unregisterServerHandler(handler); |
| | | handler.shutdown(); |
| | | unregisterServerHandler(handler); |
| | | handler.shutdown(); |
| | | |
| | | // Check if generation id has to be reset |
| | | mayResetGenerationId(); |
| | | // Check if generation id has to be reset |
| | | mayResetGenerationId(); |
| | | if (!shutdown) |
| | | { |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | if (!shutdown) |
| | | { |
| | | buildAndSendTopoInfoToRSs(); |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | buildAndSendTopoInfoToDSs(null); |
| | | } |
| | | buildAndSendTopoInfoToRSs(); |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | buildAndSendTopoInfoToDSs(null); |
| | | } |
| | | else if (otherHandlers.contains(handler)) |
| | | { |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | } |
| | | } else if (otherHandlers.contains(handler)) |
| | | { |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | } |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | // in the topology. |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | MonitorMsg returnMsg = |
| | | new MonitorMsg(msg.getDestination(), msg.getsenderID()); |
| | | // Monitoring information requested by a DS |
| | | MonitorMsg monitorMsg = |
| | | createGlobalTopologyMonitorMsg(msg.getDestination(), |
| | | msg.getsenderID()); |
| | | |
| | | try |
| | | if (monitorMsg != null) |
| | | { |
| | | returnMsg.setReplServerDbState(getDbServerState()); |
| | | // Update the information we have about all servers |
| | | // in the topology. |
| | | MonitorData md = computeMonitorData(); |
| | | |
| | | // Add the informations about the Replicas currently in |
| | | // the topology. |
| | | Iterator<Integer> it = md.ldapIterator(); |
| | | while (it.hasNext()) |
| | | try |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState( |
| | | replicaId, md.getLDAPServerState(replicaId), |
| | | md.getApproxFirstMissingDate(replicaId), true); |
| | | } |
| | | |
| | | // Add the informations about the Replication Servers |
| | | // currently in the topology. |
| | | it = md.rsIterator(); |
| | | while (it.hasNext()) |
| | | senderHandler.send(monitorMsg); |
| | | } catch (IOException e) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState( |
| | | replicaId, md.getRSStates(replicaId), |
| | | md.getRSApproxFirstMissingDate(replicaId), false); |
| | | // the connection was closed. |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | // If we can't compute the Monitor Information, send |
| | | // back an empty message. |
| | | } |
| | | try |
| | | { |
| | | senderHandler.send(returnMsg); |
| | | } catch (IOException e) |
| | | { |
| | | // the connection was closed. |
| | | } |
| | | return; |
| | | } |
| | | |
| | | MonitorMsg monitorMsg = |
| | | new MonitorMsg(msg.getDestination(), msg.getsenderID()); |
| | | |
| | | // Populate for each connected LDAP Server |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (DataServerHandler lsh : this.directoryServers.values()) |
| | | } else |
| | | { |
| | | monitorMsg.setServerState( |
| | | lsh.getServerId(), |
| | | lsh.getServerState(), |
| | | lsh.getApproxFirstMissingDate(), |
| | | true); |
| | | } |
| | | // Monitoring information requested by a RS |
| | | MonitorMsg monitorMsg = |
| | | createLocalTopologyMonitorMsg(msg.getDestination(), |
| | | msg.getsenderID()); |
| | | |
| | | // Same for the connected RS |
| | | for (ReplicationServerHandler rsh : this.replicationServers.values()) |
| | | { |
| | | monitorMsg.setServerState( |
| | | rsh.getServerId(), |
| | | rsh.getServerState(), |
| | | rsh.getApproxFirstMissingDate(), |
| | | false); |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerDbState(this.getDbServerState()); |
| | | |
| | | |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | | } catch (Exception e) |
| | | { |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( |
| | | Integer.toString((msg.getDestination())))); |
| | | if (monitorMsg != null) |
| | | { |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | | } catch (Exception e) |
| | | { |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( |
| | | Integer.toString((msg.getDestination())))); |
| | | } |
| | | } |
| | | } |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg monitorMsg = |
| | | (MonitorMsg) msg; |
| | | |
| | | receivesMonitorDataResponse(monitorMsg); |
| | | GlobalServerId globalServerId = |
| | | new GlobalServerId(baseDn, senderHandler.getServerId()); |
| | | receivesMonitorDataResponse(monitorMsg, globalServerId); |
| | | } else |
| | | { |
| | | logError(NOTE_ERR_ROUTING_TO_SERVER.get( |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new monitor message including monitoring information for the |
| | | * whole topology. |
| | | * @param sender The sender of this message. |
| | | * @param destination The destination of this message. |
| | | * @return The newly created and filled MonitorMsg. Null if a problem occurred |
| | | * during message creation. |
| | | */ |
| | | public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination) |
| | | { |
| | | MonitorMsg returnMsg = |
| | | new MonitorMsg(sender, destination); |
| | | |
| | | try |
| | | { |
| | | returnMsg.setReplServerDbState(getDbServerState()); |
| | | // Update the information we have about all servers |
| | | // in the topology. |
| | | MonitorData md = computeMonitorData(); |
| | | |
| | | // Add the informations about the Replicas currently in |
| | | // the topology. |
| | | Iterator<Integer> it = md.ldapIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState( |
| | | replicaId, md.getLDAPServerState(replicaId), |
| | | md.getApproxFirstMissingDate(replicaId), true); |
| | | } |
| | | |
| | | // Add the informations about the Replication Servers |
| | | // currently in the topology. |
| | | it = md.rsIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int replicaId = it.next(); |
| | | returnMsg.setServerState( |
| | | replicaId, md.getRSStates(replicaId), |
| | | md.getRSApproxFirstMissingDate(replicaId), false); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | // If we can't compute the Monitor Information, send |
| | | // back an empty message. |
| | | } |
| | | return returnMsg; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new monitor message including monitoring information for the |
| | | * topology directly connected to this RS. This includes information for: |
| | | * - local RS |
| | | * - all direct DSs |
| | | * - all direct RSs |
| | | * @param sender The sender of this message. |
| | | * @param destination The destination of this message. |
| | | * @return The newly created and filled MonitorMsg. Null if a problem occurred |
| | | * during message creation. |
| | | */ |
| | | public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination) |
| | | { |
| | | MonitorMsg monitorMsg = null; |
| | | |
| | | try { |
| | | |
| | | // Lock domain as we need to go through connected servers list |
| | | lock(); |
| | | |
| | | monitorMsg = new MonitorMsg(sender, destination); |
| | | |
| | | |
| | | // Populate for each connected LDAP Server |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (DataServerHandler lsh : this.directoryServers.values()) |
| | | { |
| | | monitorMsg.setServerState( |
| | | lsh.getServerId(), |
| | | lsh.getServerState(), |
| | | lsh.getApproxFirstMissingDate(), |
| | | true); |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ReplicationServerHandler rsh : this.replicationServers.values()) |
| | | { |
| | | monitorMsg.setServerState( |
| | | rsh.getServerId(), |
| | | rsh.getServerState(), |
| | | rsh.getApproxFirstMissingDate(), |
| | | false); |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerDbState(this.getDbServerState()); |
| | | } catch(InterruptedException e) |
| | | { |
| | | // At lock, too bad... |
| | | } finally |
| | | { |
| | | if (hasLock()) |
| | | release(); |
| | | } |
| | | |
| | | return monitorMsg; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicationServerDomain. |
| | | */ |
| | | public void shutdown() |
| | |
| | | |
| | | /** |
| | | * Send a TopologyMsg to all the connected directory servers in order to |
| | | * let. |
| | | * them know the topology (every known DSs and RSs) |
| | | * let them know the topology (every known DSs and RSs). |
| | | * @param notThisOne If not null, the topology message will not be sent to |
| | | * this passed server. |
| | | */ |
| | |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | } |
| | | |
| | | // Create info for us (local RS) |
| | | // Create info for the local RS |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(), |
| | | generationId, replicationServer.getGroupId()); |
| | | generationId, replicationServer.getGroupId(), |
| | | replicationServer.getWeight()); |
| | | rsInfos.add(localRSInfo); |
| | | |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | |
| | | |
| | | // Add our own info (local RS) |
| | | RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(), |
| | | generationId, replicationServer.getGroupId()); |
| | | generationId, replicationServer.getGroupId(), |
| | | replicationServer.getWeight()); |
| | | rsInfos.add(localRSInfo); |
| | | |
| | | // Go through every peer RSs (and get their connected DSs), also add info |
| | |
| | | * Start collecting global monitoring information for this |
| | | * ReplicationServerDomain. |
| | | * |
| | | * @return The number of response that should come back. |
| | | * @param expectedMonitoringMsg The list of server handler we have to wait a |
| | | * monitoring message from. Will be filled as necessary by this method. |
| | | * |
| | | * @throws DirectoryException In case the monitoring information could |
| | | * not be collected. |
| | | */ |
| | | |
| | | int initializeMonitorData() throws DirectoryException |
| | | void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg) |
| | | throws DirectoryException |
| | | { |
| | | synchronized (monitorDataLock) |
| | | { |
| | |
| | | } |
| | | |
| | | // Send the request for remote monitor data to the |
| | | return sendMonitorDataRequest(); |
| | | sendMonitorDataRequest(expectedMonitoringMsg); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** |
| | | * Sends a MonitorRequest message to all connected RS. |
| | | * @return the number of requests sent. |
| | | * @param expectedMonitoringMsg The list of server handler we have to wait a |
| | | * monitoring message from. Will be filled as necessary by this method. |
| | | * @throws DirectoryException when a problem occurs. |
| | | */ |
| | | protected int sendMonitorDataRequest() |
| | | protected void sendMonitorDataRequest( |
| | | List<GlobalServerId> expectedMonitoringMsg) |
| | | throws DirectoryException |
| | | { |
| | | int sent = 0; |
| | | try |
| | | { |
| | | for (ServerHandler rs : replicationServers.values()) |
| | | { |
| | | int serverId = rs.getServerId(); |
| | | MonitorRequestMsg msg = |
| | | new MonitorRequestMsg(this.replicationServer.getServerId(), |
| | | rs.getServerId()); |
| | | serverId); |
| | | rs.send(msg); |
| | | sent++; |
| | | // Store the fact that we expect a MonitoringMsg back from this server |
| | | expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId)); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message, e); |
| | | } |
| | | return sent; |
| | | } |
| | | |
| | | /** |
| | |
| | | * and stores the data received. |
| | | * |
| | | * @param msg The message to be processed. |
| | | * @param globalServerHandlerId server handler that is receiving the message. |
| | | */ |
| | | public void receivesMonitorDataResponse(MonitorMsg msg) |
| | | private void receivesMonitorDataResponse(MonitorMsg msg, |
| | | GlobalServerId globalServerId) |
| | | { |
| | | try |
| | | { |
| | |
| | | |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requestor thread. |
| | | replicationServer.responseReceived(); |
| | | replicationServer.responseReceived(globalServerId); |
| | | |
| | | } catch (Exception e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Starts the monitoring publisher for the domain. |
| | | */ |
| | | public void startMonitoringPublisher() |
| | | { |
| | | if (monitoringPublisher == null) |
| | | { |
| | | long period = |
| | | replicationServer.getMonitoringPublisherPeriod(); |
| | | if (period > 0) // 0 means no monitoring publisher |
| | | { |
| | | monitoringPublisher = new MonitoringPublisher(this, period); |
| | | monitoringPublisher.start(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stops the monitoring publisher for the domain. |
| | | */ |
| | | public void stopMonitoringPublisher() |
| | | { |
| | | if (monitoringPublisher != null) |
| | | { |
| | | monitoringPublisher.shutdown(); |
| | | monitoringPublisher.waitForShutdown(); |
| | | monitoringPublisher = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Tests if the monitoring publisher for this domain is running. |
| | | * @return True if the monitoring publisher is running, false otherwise. |
| | | */ |
| | | public boolean isRunningMonitoringPublisher() |
| | | { |
| | | return (monitoringPublisher != null); |
| | | } |
| | | |
| | | /** |
| | | * Update the monitoring publisher with the new period value. |
| | | * @param period The new period value. |
| | | */ |
| | | public void updateMonitoringPublisher(long period) |
| | | { |
| | | if (monitoringPublisher != null) |
| | | { |
| | | monitoringPublisher.setPeriod(period); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |