| | |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.Iterator; |
| | | |
| | |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | |
| | | |
| | | /* Monitor data management */ |
| | | |
| | | // TODO: Remote monitor data cache lifetime is 500ms/should be configurable |
| | | private long monitorDataLifeTime = 500; |
| | | |
| | | /* Search op on monitor data is processed by a worker thread. |
| | | * Requests are sent to the other RS,and responses are received by the |
| | | * listener threads. |
| | | * The worker thread is awoke on this semaphore, or on timeout. |
| | | */ |
| | | Semaphore remoteMonitorResponsesSemaphore; |
| | | /** |
| | | * The monitor data consolidated over the topology. |
| | | */ |
| | | private MonitorData monitorData = new MonitorData(); |
| | | private MonitorData wrkMonitorData; |
| | | private Object monitorDataLock = new Object(); |
| | | |
| | | /** |
| | | * The needed info for each received assured update message we are waiting |
| | |
| | | synchronized protected MonitorData computeMonitorData() |
| | | throws DirectoryException |
| | | { |
| | | if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " getRemoteMonitorData in cache"); |
| | | // The current data are still valid. No need to renew them. |
| | | return monitorData; |
| | | } |
| | | // Update the monitorData of all domains if this was necessary. |
| | | replicationServer.computeMonitorData(); |
| | | return monitorData; |
| | | } |
| | | |
| | | wrkMonitorData = new MonitorData(); |
| | | synchronized (wrkMonitorData) |
| | | /** |
| | | * Start collecting global monitoring information for this |
| | | * ReplicationServerDomain. |
| | | * |
| | | * @return The number of response that should come back. |
| | | * |
| | | * @throws DirectoryException In case the monitoring information could |
| | | * not be collected. |
| | | */ |
| | | |
| | | int initializeMonitorData() throws DirectoryException |
| | | { |
| | | synchronized (monitorDataLock) |
| | | { |
| | | wrkMonitorData = new MonitorData(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Computing monitor data "); |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Computing monitor data "); |
| | | |
| | | // Let's process our directly connected LSes |
| | | // - in the ServerHandler for a given LS1, the stored state contains : |
| | |
| | | wrkMonitorData.setMaxCN(serverID, maxcn); |
| | | wrkMonitorData.setLDAPServerState(serverID, directlshState); |
| | | wrkMonitorData.setFirstMissingDate(serverID, |
| | | directlsh.getApproxFirstMissingDate()); |
| | | directlsh.getApproxFirstMissingDate()); |
| | | } |
| | | |
| | | // Then initialize the max CN for the LS that produced something |
| | |
| | | // and we need the remote ones. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Local monitor data: " + |
| | | wrkMonitorData.toString()); |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Local monitor data: " + |
| | | wrkMonitorData.toString()); |
| | | } |
| | | |
| | | // Send Request to the other Replication Servers |
| | | if (remoteMonitorResponsesSemaphore == null) |
| | | { |
| | | remoteMonitorResponsesSemaphore = new Semaphore(0); |
| | | short requestCnt = sendMonitorDataRequest(); |
| | | // Wait reponses from them or timeout |
| | | waitMonitorDataResponses(requestCnt); |
| | | } else |
| | | { |
| | | // The processing of renewing the monitor cache is already running |
| | | // We'll make it sleeping until the end |
| | | // TODO: unit test for this case. |
| | | while (remoteMonitorResponsesSemaphore != null) |
| | | { |
| | | waitMonitorDataResponses(1); |
| | | } |
| | | } |
| | | // Send the request for remote monitor data to the |
| | | return sendMonitorDataRequest(); |
| | | } |
| | | |
| | | /** |
| | | * Complete all the calculation when all monitoring information |
| | | * has been received. |
| | | */ |
| | | void completeMonitorData() |
| | | { |
| | | wrkMonitorData.completeComputing(); |
| | | |
| | | // Store the new computed data as the reference |
| | | synchronized (monitorData) |
| | | synchronized (monitorDataLock) |
| | | { |
| | | // Now we have the expected answers or an error occurred |
| | | monitorData = wrkMonitorData; |
| | | wrkMonitorData = null; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " *** Computed MonitorData: " + |
| | | monitorData.toString()); |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " *** Computed MonitorData: " + |
| | | monitorData.toString()); |
| | | } |
| | | return monitorData; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return the number of requests sent. |
| | | * @throws DirectoryException when a problem occurs. |
| | | */ |
| | | protected short sendMonitorDataRequest() |
| | | protected int sendMonitorDataRequest() |
| | | throws DirectoryException |
| | | { |
| | | short sent = 0; |
| | | int sent = 0; |
| | | try |
| | | { |
| | | for (ServerHandler rs : replicationServers.values()) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Wait for the expected count of received MonitorMsg. |
| | | * @param expectedResponses The number of expected answers. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | protected void waitMonitorDataResponses(int expectedResponses) |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + |
| | | " waiting for " + expectedResponses + " expected monitor messages"); |
| | | |
| | | boolean allPermitsAcquired = |
| | | remoteMonitorResponsesSemaphore.tryAcquire( |
| | | expectedResponses, |
| | | (long) 5000, TimeUnit.MILLISECONDS); |
| | | |
| | | if (!allPermitsAcquired) |
| | | { |
| | | logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); |
| | | // let's go on in best effort even with limited data received. |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + |
| | | " Successfully received all " + expectedResponses + |
| | | " expected monitor messages"); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); |
| | | } finally |
| | | { |
| | | remoteMonitorResponsesSemaphore = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes a Monitor message receives from a remote Replication Server |
| | | * and stores the data received. |
| | | * |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "Receiving " + msg + " from " + msg.getsenderID() + |
| | | remoteMonitorResponsesSemaphore); |
| | | |
| | | if (remoteMonitorResponsesSemaphore == null) |
| | | { |
| | | // Let's ignore the remote monitor data just received |
| | | // since the computing processing has been ended. |
| | | // An error - probably a timemout - occurred that was already logged |
| | | logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get( |
| | | Short.toString(msg.getsenderID()))); |
| | | return; |
| | | } |
| | | "Receiving " + msg + " from " + msg.getsenderID()); |
| | | |
| | | try |
| | | { |
| | | synchronized (wrkMonitorData) |
| | | synchronized (monitorDataLock) |
| | | { |
| | | if (wrkMonitorData == null) |
| | | { |
| | | // This is a response for an earlier request whose computing is |
| | | // already complete. |
| | | logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get( |
| | | Short.toString(msg.getsenderID()))); |
| | | return; |
| | | } |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN across the RSes |
| | | ServerState replServerState = msg.getReplServerDbState(); |
| | |
| | | |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requestor thread. |
| | | remoteMonitorResponsesSemaphore.release(); |
| | | replicationServer.responseReceived(); |
| | | |
| | | } catch (Exception e) |
| | | { |
| | |
| | | |
| | | // If an exception occurs while processing one of the expected message, |
| | | // the processing is aborted and the waiting thread is awoke. |
| | | remoteMonitorResponsesSemaphore.notifyAll(); |
| | | replicationServer.responseReceivedAll(); |
| | | } |
| | | } |
| | | |