| | |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | |
| | | /** |
| | | * This thread is in charge of periodically determining if the connected |
| | |
| | | } |
| | | |
| | | /** |
| | | * Run method for the StatusAnalyzer. |
| | | * Analyzes if servers are late or not, and change their status accordingly. |
| | | */ |
| | | @Override |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Directory server status analyzer starting for dn " + |
| | | replicationServerDomain.getBaseDn()); |
| | | TRACER.debugInfo( |
| | | getMessage("Directory server status analyzer starting.")); |
| | | } |
| | | |
| | | final int localRsId = replicationServerDomain.getLocalRSServerId(); |
| | | boolean interrupted = false; |
| | | while (!shutdown && !interrupted) |
| | | while (!shutdown) |
| | | { |
| | | synchronized (shutdownLock) |
| | | { |
| | |
| | | // for it and change status accordingly if threshold value is |
| | | // crossed/uncrossed |
| | | for (DataServerHandler serverHandler : |
| | | replicationServerDomain.getConnectedDSs(). values()) |
| | | replicationServerDomain.getConnectedDSs().values()) |
| | | { |
| | | // Get number of pending changes for this server |
| | | int nChanges = serverHandler.getRcvMsgQueueSize(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Status analyzer for dn " |
| | | + replicationServerDomain.getBaseDn() + " DS " |
| | | TRACER.debugInfo(getMessage("Status analyzer: DS " |
| | | + serverHandler.getServerId() + " has " + nChanges |
| | | + " message(s) in writer queue. This is in RS " + localRsId); |
| | | + " message(s) in writer queue.")); |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // we are in a status that do not allows that, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer method. This allows to take the |
| | | // the current status does allow it, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer() method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | if (degradedStatusThreshold > 0) |
| | | // Threshold value = 0 means no status analyzer (no degrading system) |
| | |
| | | { |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | if (serverHandler.getStatus() == ServerStatus.NORMAL_STATUS) |
| | | if (serverHandler.getStatus() == NORMAL_STATUS |
| | | && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT)) |
| | | { |
| | | interrupted = |
| | | replicationServerDomain.changeStatusFromStatusAnalyzer( |
| | | serverHandler, |
| | | StatusMachineEvent.TO_DEGRADED_STATUS_EVENT); |
| | | if (interrupted) |
| | | { |
| | | // Finish job and let thread die |
| | | TRACER.debugInfo("Status analyzer for dn " |
| | | + replicationServerDomain.getBaseDn() |
| | | + " has been interrupted and will die. This is in RS " |
| | | + localRsId); |
| | | break; |
| | | } |
| | | break; |
| | | } |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | if (serverHandler.getStatus() == ServerStatus.DEGRADED_STATUS) |
| | | if (serverHandler.getStatus() == DEGRADED_STATUS |
| | | && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT)) |
| | | { |
| | | interrupted = |
| | | replicationServerDomain.changeStatusFromStatusAnalyzer( |
| | | serverHandler, |
| | | StatusMachineEvent.TO_NORMAL_STATUS_EVENT); |
| | | if (interrupted) |
| | | { |
| | | // Finish job and let thread die |
| | | TRACER.debugInfo("Status analyzer for dn " |
| | | + replicationServerDomain.getBaseDn() |
| | | + " has been interrupted and will die. This is in RS " |
| | | + localRsId); |
| | | break; |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | done = true; |
| | | TRACER.debugInfo("Status analyzer for dn " |
| | | + replicationServerDomain.getBaseDn() + " is terminated." |
| | | + " This is in RS " + localRsId); |
| | | TRACER.debugInfo(getMessage("Status analyzer is terminated.")); |
| | | } |
| | | |
| | | private String getMessage(String message) |
| | | { |
| | | return "In RS " + replicationServerDomain.getLocalRSServerId() |
| | | + ", for base dn " + replicationServerDomain.getBaseDn() + ": " |
| | | + message; |
| | | } |
| | | |
| | | private boolean isInterrupted(DataServerHandler serverHandler, |
| | | StatusMachineEvent event) |
| | | { |
| | | if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler, |
| | | event)) |
| | | { |
| | | // Finish job and let thread die |
| | | TRACER.debugInfo( |
| | | getMessage("Status analyzer has been interrupted and will die.")); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Shutting down status analyzer for dn " |
| | | + replicationServerDomain.getBaseDn() |
| | | + " in RS " + replicationServerDomain.getLocalRSServerId()); |
| | | TRACER.debugInfo(getMessage("Shutting down status analyzer.")); |
| | | } |
| | | } |
| | | } |
| | |
| | | n++; |
| | | if (n >= FACTOR) |
| | | { |
| | | TRACER.debugInfo("Interrupting status analyzer for dn " + |
| | | replicationServerDomain.getBaseDn() + " in RS " + |
| | | replicationServerDomain.getLocalRSServerId()); |
| | | TRACER.debugInfo(getMessage("Interrupting status analyzer.")); |
| | | interrupt(); |
| | | } |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Directory server status analyzer for dn " + |
| | | replicationServerDomain.getBaseDn() + " changing threshold value to " + |
| | | degradedStatusThreshold); |
| | | TRACER.debugInfo(getMessage( |
| | | "Directory server status analyzer changing threshold value to " |
| | | + degradedStatusThreshold)); |
| | | } |
| | | |
| | | this.degradedStatusThreshold = degradedStatusThreshold; |