| | |
| | | * the threshold is uncrossed, the status analyzer must make the DS status |
| | | * change back to NORMAL_STATUS. To have meaning of status, please refer to |
| | | * ServerStatus class. |
| | | * <p> |
| | | * In addition, this thread is responsible for publishing any pending status |
| | | * messages. |
| | | */ |
| | | public class StatusAnalyzer extends DirectoryThread |
| | | class StatusAnalyzer extends DirectoryThread |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger |
| | | .getLoggerForThisClass(); |
| | |
| | | private static final int STATUS_ANALYZER_SLEEP_TIME = 5000; |
| | | |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private final Object shutdownLock = new Object(); |
| | | private volatile boolean shutdown = false; |
| | | private volatile boolean done = false; |
| | | private final Object eventMonitor = new Object(); |
| | | private boolean pendingStatusMessage = false; |
| | | private long nextCheckDSDegradedStatusTime; |
| | | |
| | | |
| | | |
| | |
| | | * @param replicationServerDomain |
| | | * The ReplicationServerDomain the status analyzer is for. |
| | | */ |
| | | public StatusAnalyzer(ReplicationServerDomain replicationServerDomain) |
| | | StatusAnalyzer(ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getLocalRSServerId() |
| | | + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN() |
| | | + ") status monitor for domain \"" |
| | | + replicationServerDomain.getBaseDN() |
| | | + "\""); |
| | | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | } |
| | | |
| | |
| | | logger.trace(getMessage("Directory server status analyzer starting.")); |
| | | } |
| | | |
| | | while (!shutdown) |
| | | try |
| | | { |
| | | synchronized (shutdownLock) |
| | | while (true) |
| | | { |
| | | if (!shutdown) |
| | | final boolean requestStatusBroadcastWasRequested; |
| | | synchronized (eventMonitor) |
| | | { |
| | | try |
| | | if (!isShutdownInitiated() && !pendingStatusMessage) |
| | | { |
| | | shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME); |
| | | eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Server shutdown monitor may interrupt slow threads. |
| | | logger.traceException(e); |
| | | shutdown = true; |
| | | break; |
| | | } |
| | | requestStatusBroadcastWasRequested = pendingStatusMessage; |
| | | pendingStatusMessage = false; |
| | | } |
| | | |
| | | if (isShutdownInitiated()) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | // Broadcast heartbeats, topology messages, etc if requested. |
| | | if (requestStatusBroadcastWasRequested) |
| | | { |
| | | replicationServerDomain.sendPendingStatusMessages(); |
| | | } |
| | | |
| | | /* |
| | | * Check the degraded status for connected DS instances only if |
| | | * sufficient time has passed. The current time is not cached because |
| | | * the call to checkDSDegradedStatus may take some time. |
| | | */ |
| | | if (nextCheckDSDegradedStatusTime < System.currentTimeMillis()) |
| | | { |
| | | replicationServerDomain.checkDSDegradedStatus(); |
| | | nextCheckDSDegradedStatusTime = System.currentTimeMillis() |
| | | + STATUS_ANALYZER_SLEEP_TIME; |
| | | } |
| | | } |
| | | |
| | | if (shutdown) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | replicationServerDomain.checkDSDegradedStatus(); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Forcefully stopped. |
| | | } |
| | | |
| | | done = true; |
| | | logger.trace(getMessage("Status analyzer is terminated.")); |
| | | } |
| | | |
| | |
| | | /** |
| | | * Stops the thread. |
| | | */ |
| | | public void shutdown() |
| | | void shutdown() |
| | | { |
| | | synchronized (shutdownLock) |
| | | initiateShutdown(); |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | shutdown = true; |
| | | shutdownLock.notifyAll(); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace(getMessage("Shutting down status analyzer.")); |
| | | } |
| | | logger.trace(getMessage("Shutting down status analyzer.")); |
| | | } |
| | | synchronized (eventMonitor) |
| | | { |
| | | eventMonitor.notifyAll(); |
| | | } |
| | | try |
| | | { |
| | | join(2000); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Trapped: forcefully stop the thread. |
| | | } |
| | | if (isAlive()) |
| | | { |
| | | // The join timed out or was interrupted so attempt to forcefully stop the |
| | | // analyzer. |
| | | interrupt(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Waits for analyzer death. If not finished within 2 seconds, forces |
| | | * interruption |
| | | * Requests that a topology state related message be broadcast to the rest of |
| | | * the topology. Messages include DS heartbeats, topology information, etc. |
| | | */ |
| | | public void waitForShutdown() |
| | | void notifyPendingStatusMessage() |
| | | { |
| | | try |
| | | synchronized (eventMonitor) |
| | | { |
| | | int FACTOR = 40; // Wait for 2 seconds before interrupting the thread |
| | | int n = 0; |
| | | while (!done && this.isAlive()) |
| | | { |
| | | Thread.sleep(50); |
| | | n++; |
| | | if (n >= FACTOR) |
| | | { |
| | | logger.trace(getMessage("Interrupting status analyzer.")); |
| | | interrupt(); |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // exit the loop if this thread is interrupted. |
| | | pendingStatusMessage = true; |
| | | eventMonitor.notifyAll(); |
| | | } |
| | | } |
| | | } |