| | |
| | | * |
| | | * |
| | | * Copyright 2008-2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | |
| | | |
| | | /** |
| | | * This thread is in charge of periodically determining if the connected |
| | |
| | | * the threshold is uncrossed, the status analyzer must make the DS status |
| | | * change back to NORMAL_STATUS. To have meaning of status, please refer to |
| | | * ServerStatus class. |
| | | * <p> |
| | | * In addition, this thread is responsible for publishing any pending status |
| | | * messages. |
| | | */ |
| | | public class StatusAnalyzer extends DirectoryThread |
| | | class StatusAnalyzer extends DirectoryThread |
| | | { |
| | | |
| | | private volatile boolean shutdown = false; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private volatile int degradedStatusThreshold = -1; |
| | | |
| | | /** Sleep time for the thread, in ms. */ |
| | | private static final int STATUS_ANALYZER_SLEEP_TIME = 5000; |
| | | |
| | | private volatile boolean done = false; |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private final Object eventMonitor = new Object(); |
| | | private boolean pendingStatusMessage = false; |
| | | private long nextCheckDSDegradedStatusTime; |
| | | |
| | | private final Object shutdownLock = new Object(); |
| | | |
| | | |
| | | /** |
| | | * Create a StatusAnalyzer. |
| | | * @param replicationServerDomain The ReplicationServerDomain the status |
| | | * analyzer is for. |
| | | * @param degradedStatusThreshold The pending changes threshold value to be |
| | | * used for putting a DS in DEGRADED_STATUS. |
| | | * |
| | | * @param replicationServerDomain |
| | | * The ReplicationServerDomain the status analyzer is for. |
| | | */ |
| | | public StatusAnalyzer(ReplicationServerDomain replicationServerDomain, |
| | | int degradedStatusThreshold) |
| | | StatusAnalyzer(ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getLocalRSServerId() |
| | | + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN() |
| | | + ") status monitor for domain \"" |
| | | + replicationServerDomain.getBaseDN() |
| | | + "\""); |
| | | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | this.degradedStatusThreshold = degradedStatusThreshold; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Analyzes if servers are late or not, and change their status accordingly. |
| | | */ |
| | |
| | | getMessage("Directory server status analyzer starting.")); |
| | | } |
| | | |
| | | while (!shutdown) |
| | | try |
| | | { |
| | | synchronized (shutdownLock) |
| | | while (true) |
| | | { |
| | | if (!shutdown) |
| | | final boolean requestStatusBroadcastWasRequested; |
| | | synchronized (eventMonitor) |
| | | { |
| | | try |
| | | if (!isShutdownInitiated() && !pendingStatusMessage) |
| | | { |
| | | shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME); |
| | | eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Server shutdown monitor may interrupt slow threads. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | shutdown = true; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Go through each connected DS, get the number of pending changes we have |
| | | // for it and change status accordingly if threshold value is |
| | | // crossed/uncrossed |
| | | for (DataServerHandler serverHandler : |
| | | replicationServerDomain.getConnectedDSs().values()) |
| | | { |
| | | // Get number of pending changes for this server |
| | | int nChanges = serverHandler.getRcvMsgQueueSize(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(getMessage("Status analyzer: DS " |
| | | + serverHandler.getServerId() + " has " + nChanges |
| | | + " message(s) in writer queue.")); |
| | | requestStatusBroadcastWasRequested = pendingStatusMessage; |
| | | pendingStatusMessage = false; |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // the current status does allow it, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer() method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | if (degradedStatusThreshold > 0) |
| | | // Threshold value = 0 means no status analyzer (no degrading system) |
| | | // we should not have that as the status analyzer thread should not be |
| | | // created if this is the case, but for sanity purpose, we add this |
| | | // test |
| | | if (isShutdownInitiated()) |
| | | { |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | if (serverHandler.getStatus() == NORMAL_STATUS |
| | | && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT)) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (serverHandler.getStatus() == DEGRADED_STATUS |
| | | && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT)) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | break; |
| | | } |
| | | |
| | | // Broadcast heartbeats, topology messages, etc if requested. |
| | | if (requestStatusBroadcastWasRequested) |
| | | { |
| | | replicationServerDomain.sendPendingStatusMessages(); |
| | | } |
| | | |
| | | /* |
| | | * Check the degraded status for connected DS instances only if |
| | | * sufficient time has passed. The current time is not cached because |
| | | * the call to checkDSDegradedStatus may take some time. |
| | | */ |
| | | if (nextCheckDSDegradedStatusTime < System.currentTimeMillis()) |
| | | { |
| | | replicationServerDomain.checkDSDegradedStatus(); |
| | | nextCheckDSDegradedStatusTime = System.currentTimeMillis() |
| | | + STATUS_ANALYZER_SLEEP_TIME; |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Forcefully stopped. |
| | | } |
| | | |
| | | done = true; |
| | | TRACER.debugInfo(getMessage("Status analyzer is terminated.")); |
| | | } |
| | | |
| | | |
| | | |
| | | private String getMessage(String message) |
| | | { |
| | | return "In RS " + replicationServerDomain.getLocalRSServerId() |
| | |
| | | + message; |
| | | } |
| | | |
| | | private boolean isInterrupted(DataServerHandler serverHandler, |
| | | StatusMachineEvent event) |
| | | { |
| | | if (replicationServerDomain.changeStatus(serverHandler, event)) |
| | | { |
| | | // Finish job and let thread die |
| | | TRACER.debugInfo( |
| | | getMessage("Status analyzer has been interrupted and will die.")); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Stops the thread. |
| | | */ |
| | | public void shutdown() |
| | | void shutdown() |
| | | { |
| | | synchronized (shutdownLock) |
| | | { |
| | | shutdown = true; |
| | | shutdownLock.notifyAll(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(getMessage("Shutting down status analyzer.")); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Waits for analyzer death. If not finished within 2 seconds, |
| | | * forces interruption |
| | | */ |
| | | public void waitForShutdown() |
| | | { |
| | | try |
| | | { |
| | | int FACTOR = 40; // Wait for 2 seconds before interrupting the thread |
| | | int n = 0; |
| | | while (!done && this.isAlive()) |
| | | { |
| | | Thread.sleep(50); |
| | | n++; |
| | | if (n >= FACTOR) |
| | | { |
| | | TRACER.debugInfo(getMessage("Interrupting status analyzer.")); |
| | | interrupt(); |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | { |
| | | // exit the loop if this thread is interrupted. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sets the threshold value. |
| | | * @param degradedStatusThreshold The new threshold value. |
| | | */ |
| | | public void setDegradedStatusThreshold(int degradedStatusThreshold) |
| | | { |
| | | initiateShutdown(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(getMessage( |
| | | "Directory server status analyzer changing threshold value to " |
| | | + degradedStatusThreshold)); |
| | | TRACER.debugInfo(getMessage("Shutting down status analyzer.")); |
| | | } |
| | | synchronized (eventMonitor) |
| | | { |
| | | eventMonitor.notifyAll(); |
| | | } |
| | | try |
| | | { |
| | | join(2000); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Trapped: forcefully stop the thread. |
| | | } |
| | | if (isAlive()) |
| | | { |
| | | // The join timed out or was interrupted so attempt to forcefully stop the |
| | | // analyzer. |
| | | interrupt(); |
| | | } |
| | | } |
| | | |
| | | this.degradedStatusThreshold = degradedStatusThreshold; |
| | | |
| | | |
| | | /** |
| | | * Requests that a topology state related message be broadcast to the rest of |
| | | * the topology. Messages include DS heartbeats, topology information, etc. |
| | | */ |
| | | void notifyPendingStatusMessage() |
| | | { |
| | | synchronized (eventMonitor) |
| | | { |
| | | pendingStatusMessage = true; |
| | | eventMonitor.notifyAll(); |
| | | } |
| | | } |
| | | } |