| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.api.DirectoryThread; |
| | | |
| | | |
| | | |
| | | /** |
| | | * This thread is in charge of periodically determining if the connected |
| | |
| | | */ |
| | | public class StatusAnalyzer extends DirectoryThread |
| | | { |
| | | |
| | | private volatile boolean shutdown = false; |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private volatile int degradedStatusThreshold = -1; |
| | | private static final LocalizedLogger logger = LocalizedLogger |
| | | .getLoggerForThisClass(); |
| | | |
| | | /** Sleep time for the thread, in ms. */ |
| | | private static final int STATUS_ANALYZER_SLEEP_TIME = 5000; |
| | | |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private final Object shutdownLock = new Object(); |
| | | private volatile boolean shutdown = false; |
| | | private volatile boolean done = false; |
| | | |
| | | private final Object shutdownLock = new Object(); |
| | | |
| | | |
| | | /** |
| | | * Create a StatusAnalyzer. |
| | | * @param replicationServerDomain The ReplicationServerDomain the status |
| | | * analyzer is for. |
| | | * @param degradedStatusThreshold The pending changes threshold value to be |
| | | * used for putting a DS in DEGRADED_STATUS. |
| | | * |
| | | * @param replicationServerDomain |
| | | * The ReplicationServerDomain the status analyzer is for. |
| | | */ |
| | | public StatusAnalyzer(ReplicationServerDomain replicationServerDomain, |
| | | int degradedStatusThreshold) |
| | | public StatusAnalyzer(ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getLocalRSServerId() |
| | |
| | | + "\""); |
| | | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | this.degradedStatusThreshold = degradedStatusThreshold; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Analyzes if servers are late or not, and change their status accordingly. |
| | | */ |
| | |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace( |
| | | getMessage("Directory server status analyzer starting.")); |
| | | logger.trace(getMessage("Directory server status analyzer starting.")); |
| | | } |
| | | |
| | | while (!shutdown) |
| | |
| | | } |
| | | } |
| | | |
| | | // Go through each connected DS, get the number of pending changes we have |
| | | // for it and change status accordingly if threshold value is |
| | | // crossed/uncrossed |
| | | for (DataServerHandler serverHandler : |
| | | replicationServerDomain.getConnectedDSs().values()) |
| | | if (shutdown) |
| | | { |
| | | // Get number of pending changes for this server |
| | | int nChanges = serverHandler.getRcvMsgQueueSize(); |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace(getMessage("Status analyzer: DS " |
| | | + serverHandler.getServerId() + " has " + nChanges |
| | | + " message(s) in writer queue.")); |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // the current status does allow it, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer() method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | if (degradedStatusThreshold > 0) |
| | | // Threshold value = 0 means no status analyzer (no degrading system) |
| | | // we should not have that as the status analyzer thread should not be |
| | | // created if this is the case, but for sanity purpose, we add this |
| | | // test |
| | | { |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | if (serverHandler.getStatus() == NORMAL_STATUS |
| | | && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT)) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (serverHandler.getStatus() == DEGRADED_STATUS |
| | | && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT)) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | break; |
| | | } |
| | | |
| | | replicationServerDomain.checkDSDegradedStatus(); |
| | | } |
| | | |
| | | done = true; |
| | | logger.trace(getMessage("Status analyzer is terminated.")); |
| | | } |
| | | |
| | | |
| | | |
| | | private String getMessage(String message) |
| | | { |
| | | return "In RS " + replicationServerDomain.getLocalRSServerId() |
| | |
| | | + message; |
| | | } |
| | | |
| | | private boolean isInterrupted(DataServerHandler serverHandler, |
| | | StatusMachineEvent event) |
| | | { |
| | | if (replicationServerDomain.changeStatus(serverHandler, event)) |
| | | { |
| | | // Finish job and let thread die |
| | | logger.trace( |
| | | getMessage("Status analyzer has been interrupted and will die.")); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Stops the thread. |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Waits for analyzer death. If not finished within 2 seconds, |
| | | * forces interruption |
| | | * Waits for analyzer death. If not finished within 2 seconds, forces |
| | | * interruption |
| | | */ |
| | | public void waitForShutdown() |
| | | { |
| | |
| | | interrupt(); |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // exit the loop if this thread is interrupted. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sets the threshold value. |
| | | * @param degradedStatusThreshold The new threshold value. |
| | | */ |
| | | public void setDegradedStatusThreshold(int degradedStatusThreshold) |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace(getMessage( |
| | | "Directory server status analyzer changing threshold value to " |
| | | + degradedStatusThreshold)); |
| | | } |
| | | |
| | | this.degradedStatusThreshold = degradedStatusThreshold; |
| | | } |
| | | } |