| | |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import java.util.Collections; |
| | | |
| | | /** |
| | | * ReplicationServer Listener. |
| | |
| | | // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled |
| | | private int degradedStatusThreshold = 5000; |
| | | |
| | | // Number of milliseconds to wait before sending new monitoring messages. |
| | | // If value is 0, monitoring publisher is disabled |
| | | private long monitoringPublisherPeriod = 3000; |
| | | |
| | | // The handler of the draft change numbers database, the database used to |
| | | // store the relation between a draft change number ('seqnum') and the |
| | | // associated cookie. |
| | |
| | | private int weight = 1; |
| | | |
| | | /** |
| | | * Holds the list of all replication servers instantiated in this VM. |
| | | * This allows to perform clean up of the RS databases in unit tests. |
| | | */ |
| | | private static List<ReplicationServer> allInstances = |
| | | new ArrayList<ReplicationServer>(); |
| | | |
| | | /** |
| | | * Creates a new Replication server using the provided configuration entry. |
| | | * |
| | | * @param configuration The configuration of this replication server. |
| | |
| | | groupId = (byte)configuration.getGroupId(); |
| | | assuredTimeout = configuration.getAssuredTimeout(); |
| | | degradedStatusThreshold = configuration.getDegradedStatusThreshold(); |
| | | monitoringPublisherPeriod = configuration.getMonitoringPeriod(); |
| | | |
| | | replSessionSecurity = new ReplSessionSecurity(); |
| | | initialize(replicationPort); |
| | |
| | | DirectoryServer.registerImportTaskListener(this); |
| | | |
| | | localPorts.add(replicationPort); |
| | | |
| | | // Keep track of this new instance |
| | | allInstances.add(this); |
| | | } |
| | | |
| | | /** |
| | | * Get the list of every replication servers instantiated in the current VM. |
| | | * @return The list of every replication servers instantiated in the current |
| | | * VM. |
| | | */ |
| | | public static List<ReplicationServer> getAllInstances() |
| | | { |
| | | return allInstances; |
| | | } |
| | | |
| | | /** |
| | | * The run method for the Listen thread. |
| | |
| | | dbEnv.shutdown(); |
| | | } |
| | | |
| | | } |
| | | // Remove this instance from the global instance list |
| | | allInstances.remove(this); |
| | | } |
| | | |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | |
| | | // Update period value for monitoring publishers (stop them if requested |
| | | // value is 0) |
| | | if (monitoringPublisherPeriod != configuration.getMonitoringPeriod()) |
| | | { |
| | | long oldMonitoringPeriod = monitoringPublisherPeriod; |
| | | monitoringPublisherPeriod = configuration.getMonitoringPeriod(); |
| | | for(ReplicationServerDomain rsd : baseDNs.values()) |
| | | { |
| | | if (monitoringPublisherPeriod == 0L) |
| | | { |
| | | // Requested to stop monitoring publishers |
| | | rsd.stopMonitoringPublisher(); |
| | | } else if (rsd.isRunningMonitoringPublisher()) |
| | | { |
| | | // Update the threshold value for this running monitoring publisher |
| | | rsd.updateMonitoringPublisher(monitoringPublisherPeriod); |
| | | } else if (oldMonitoringPeriod == 0L) |
| | | { |
| | | // Requested to start monitoring publishers with provided period value |
| | | if ( (rsd.getConnectedDSs().size() > 0) || |
| | | (rsd.getConnectedRSs().size() > 0) ) |
| | | rsd.startMonitoringPublisher(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Changed the group id ? |
| | | byte newGroupId = (byte)configuration.getGroupId(); |
| | | if (newGroupId != groupId) |
| | |
| | | if (weight != configuration.getWeight()) |
| | | { |
| | | weight = configuration.getWeight(); |
| | | // TODO: send new TopologyMsg |
| | | // Broadcast the new weight the the whole topology. This will make some |
| | | // DSs reconnect (if needed) to other RSs according to the new weight of |
| | | // this RS. |
| | | broadcastConfigChange(); |
| | | } |
| | | |
| | | if ((configuration.getReplicationDBDirectory() != null) && |
| | |
| | | } |
| | | |
| | | /** |
| | | * Broadcast a configuration change that just happened to the whole topology |
| | | * by sending a TopologyMsg to every entity in the topology. |
| | | */ |
| | | private void broadcastConfigChange() |
| | | { |
| | | for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) |
| | | { |
| | | replicationServerDomain.buildAndSendTopoInfoToDSs(null); |
| | | replicationServerDomain.buildAndSendTopoInfoToRSs(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean isConfigurationChangeAcceptable( |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the monitoring publisher period value. |
| | | * @return the monitoring publisher period value. |
| | | */ |
| | | public long getMonitoringPublisherPeriod() |
| | | { |
| | | return monitoringPublisherPeriod; |
| | | } |
| | | |
| | | /** |
| | | * Compute the list of replication servers that are not any |
| | | * more connected to this Replication Server and stop the |
| | | * corresponding handlers. |
| | |
| | | /* The date of the last time they have been elaborated */ |
| | | private long monitorDataLastBuildDate = 0; |
| | | |
| | | /* Search op on monitor data is processed by a worker thread. |
| | | * Requests are sent to the other RS,and responses are received by the |
| | | * listener threads. |
| | | * The worker thread is awoke on this semaphore, or on timeout. |
| | | /** |
| | | * This uniquely identifies a server (handler) in the cross-domain topology. |
| | | * Represents an identifier of a handler (in the whole RS) we have to wait a |
| | | * monitoring message from before answering to a monitor request. |
| | | */ |
| | | Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0); |
| | | public static class GlobalServerId { |
| | | |
| | | private int serverId = -1; |
| | | private String baseDn = null; |
| | | |
| | | /** |
| | | * Constructor for a global server id. |
| | | * @param baseDn The dn of the RSD owning the handler. |
| | | * @param serverId The handler id in the matching RSD. |
| | | */ |
| | | public GlobalServerId(String baseDn, int serverId) { |
| | | this.baseDn = baseDn; |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | | * Get the server handler id. |
| | | * @return the serverId |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Get the base dn. |
| | | * @return the baseDn |
| | | */ |
| | | public String getBaseDn() |
| | | { |
| | | return baseDn; |
| | | } |
| | | |
| | | /** |
| | | * Get the hascode. |
| | | * @return The hashcode. |
| | | */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | int hash = 7; |
| | | hash = 43 * hash + this.serverId; |
| | | hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0); |
| | | return hash; |
| | | } |
| | | |
| | | /** |
| | | * Tests if the passed global server handler id represents the same server |
| | | * handler as this one. |
| | | * @param obj The object to test. |
| | | * @return True if both identifiers are the same. |
| | | */ |
| | | public boolean equals(Object obj) { |
| | | if ( (obj == null) || (obj instanceof GlobalServerId)) |
| | | return false; |
| | | |
| | | GlobalServerId globalServerId = (GlobalServerId)obj; |
| | | return ( globalServerId.baseDn.equals(baseDn) && |
| | | (globalServerId.serverId == serverId) ); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This gives the list of server handlers we are willing to wait monitoring |
| | | * message from. Each time a monitoring message is received by a server |
| | | * handler, the matching server handler id is retired from the list. When the |
| | | * list is empty, we received all expected monitoring messages. |
| | | */ |
| | | private List<GlobalServerId> expectedMonitoringMsg = null; |
| | | |
| | | /** |
| | | * Trigger the computation of the Global Monitoring Data. |
| | |
| | | * |
| | | * @throws DirectoryException If the computation cannot be achieved. |
| | | */ |
| | | public void computeMonitorData() throws DirectoryException |
| | | public synchronized void computeMonitorData() throws DirectoryException |
| | | { |
| | | if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime()) |
| | | { |
| | |
| | | return; |
| | | } |
| | | |
| | | remoteMonitorResponsesSemaphore.drainPermits(); |
| | | int count = 0; |
| | | // Initialize the list of server handlers we expect monitoring messages from |
| | | expectedMonitoringMsg = |
| | | Collections.synchronizedList(new ArrayList<GlobalServerId>()); |
| | | |
| | | for (ReplicationServerDomain domain : baseDNs.values()) |
| | | { |
| | | count += domain.initializeMonitorData(); |
| | | domain.initializeMonitorData(expectedMonitoringMsg); |
| | | } |
| | | |
| | | // Wait for responses |
| | | waitMonitorDataResponses(count); |
| | | waitMonitorDataResponses(); |
| | | |
| | | for (ReplicationServerDomain domain : baseDNs.values()) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Wait for the expected count of received MonitorMsg. |
| | | * @param expectedResponses The number of expected answers. |
| | | * Wait for the expected received MonitorMsg. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | private void waitMonitorDataResponses(int expectedResponses) |
| | | private void waitMonitorDataResponses() |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + getMonitorInstanceName() + " baseDn=" + |
| | | " waiting for " + expectedResponses + " expected monitor messages"); |
| | | "In " + getMonitorInstanceName() + |
| | | " waiting for " + expectedMonitoringMsg.size() + |
| | | " expected monitor messages"); |
| | | |
| | | boolean allPermitsAcquired = |
| | | remoteMonitorResponsesSemaphore.tryAcquire( |
| | | expectedResponses, |
| | | (long) 5000, TimeUnit.MILLISECONDS); |
| | | |
| | | if (!allPermitsAcquired) |
| | | // Wait up to 5 seconds for every expected monitoring message to come |
| | | // back. |
| | | boolean allReceived = false; |
| | | long startTime = TimeThread.getTime(); |
| | | long curTime = startTime; |
| | | int maxTime = 5000; |
| | | while ( (curTime - startTime) < maxTime ) |
| | | { |
| | | monitorDataLastBuildDate = TimeThread.getTime(); |
| | | // Have every expected monitoring messages arrived ? |
| | | if (expectedMonitoringMsg.size() == 0) |
| | | { |
| | | // Ok break the loop |
| | | allReceived = true; |
| | | break; |
| | | } |
| | | Thread.sleep(100); |
| | | curTime = TimeThread.getTime(); |
| | | } |
| | | |
| | | monitorDataLastBuildDate = TimeThread.getTime(); |
| | | |
| | | if (!allReceived) |
| | | { |
| | | logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); |
| | | // let's go on in best effort even with limited data received. |
| | | // let's go on in best effort even with limited data received. |
| | | } else |
| | | { |
| | | monitorDataLastBuildDate = TimeThread.getTime(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + getMonitorInstanceName() + " baseDn=" + |
| | | " Successfully received all " + expectedResponses + |
| | | " expected monitor messages"); |
| | | "In " + getMonitorInstanceName() + |
| | | " Successfully received all expected monitor messages"); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | |
| | | |
| | | /** |
| | | * This should be called by each ReplicationServerDomain that receives |
| | | * a response to a monitor request message. |
| | | * a response to a monitor request message. This may also be called when a |
| | | * monitoring message is coming from a RS whose monitoring publisher thread |
| | | * sent it. As monitoring messages (sent because of monitoring request or |
| | | * because of monitoring publisher) have the same content, this is also ok |
| | | * to mark ok the server when the monitoring message coms from a monitoring |
| | | * publisher thread. |
| | | * @param globalServerId The server handler that is receiving the |
| | | * monitoring message. |
| | | */ |
| | | public void responseReceived() |
| | | public void responseReceived(GlobalServerId globalServerId) |
| | | { |
| | | remoteMonitorResponsesSemaphore.release(); |
| | | expectedMonitoringMsg.remove(globalServerId); |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public void responseReceivedAll() |
| | | { |
| | | remoteMonitorResponsesSemaphore.notifyAll(); |
| | | expectedMonitoringMsg.clear(); |
| | | } |
| | | |
| | | /** |