| | |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | |
| | | localReplicationServer.getDegradedStatusThreshold(); |
| | | if (degradedStatusThreshold > 0) // 0 means no status analyzer |
| | | { |
| | | final StatusAnalyzer thread = |
| | | new StatusAnalyzer(this, degradedStatusThreshold); |
| | | final StatusAnalyzer thread = new StatusAnalyzer(this); |
| | | if (statusAnalyzer.compareAndSet(null, thread)) |
| | | { |
| | | thread.start(); |
| | |
| | | { |
| | | // Requested to stop analyzers |
| | | stopStatusAnalyzer(); |
| | | return; |
| | | } |
| | | |
| | | final StatusAnalyzer saThread = statusAnalyzer.get(); |
| | | if (saThread != null) // it is running |
| | | { |
| | | saThread.setDegradedStatusThreshold(degradedStatusThreshold); |
| | | } |
| | | else if (connectedDSs.size() > 0) |
| | | else if (statusAnalyzer.get() == null && connectedDSs.size() > 0) |
| | | { |
| | | // Requested to start analyzers with provided threshold value |
| | | startStatusAnalyzer(); |
| | |
| | | + " and port=" + localReplicationServer.getReplicationPort() |
| | | + ": " + message); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Go through each connected DS, get the number of pending changes we have for |
| | | * it and change status accordingly if threshold value is crossed/uncrossed. |
| | | */ |
| | | void checkDSDegradedStatus() |
| | | { |
| | | final int degradedStatusThreshold = localReplicationServer |
| | | .getDegradedStatusThreshold(); |
| | | // Threshold value = 0 means no status analyzer (no degrading system) |
| | | // we should not have that as the status analyzer thread should not be |
| | | // created if this is the case, but for sanity purpose, we add this |
| | | // test |
| | | if (degradedStatusThreshold > 0) |
| | | { |
| | | for (DataServerHandler serverHandler : getConnectedDSs().values()) |
| | | { |
| | | // Get number of pending changes for this server |
| | | final int nChanges = serverHandler.getRcvMsgQueueSize(); |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace("In RS " + getLocalRSServerId() + ", for baseDN=" |
| | | + getBaseDN() + ": " + "Status analyzer: DS " |
| | | + serverHandler.getServerId() + " has " + nChanges |
| | | + " message(s) in writer queue."); |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // the current status does allow it, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer() method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | if (serverHandler.getStatus() == NORMAL_STATUS |
| | | && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT)) |
| | | { |
| | | break; // Interrupted. |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (serverHandler.getStatus() == DEGRADED_STATUS |
| | | && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT)) |
| | | { |
| | | break; // Interrupted. |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |