opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -62,8 +62,6 @@ * date of the first missing change. */ /* The date of the last time they have been elaborated */ private long buildDate = 0; // For each LDAP server, its server state private ConcurrentHashMap<Short, ServerState> LDAPStates = @@ -103,7 +101,7 @@ { Long afmd = fmd.get(serverId); if ((afmd != null) && (afmd>0)) return ((this.getBuildDate() - afmd)/1000); return (TimeThread.getTime() - afmd)/1000; else return 0; } @@ -243,7 +241,6 @@ TRACER.debugInfo( "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds); } this.setBuildDate(TimeThread.getTime()); } /** @@ -255,7 +252,6 @@ { String mds = "Monitor data=\n"; mds+= "Build date=" + this.getBuildDate(); // RS data Iterator<Short> rsite = fmRSDate.keySet().iterator(); while (rsite.hasNext()) @@ -281,10 +277,9 @@ ServerState ss = LDAPStates.get(sid); mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString() + "] afmd=" + this.getApproxFirstMissingDate(sid); if (getBuildDate()>0) { mds += " missingDelay=" + this.getApproxDelay(sid); } mds +=" missingCount=" + missingChanges.get(sid); } @@ -304,24 +299,6 @@ } /** * Sets the build date of the data. * @param buildDate The date. */ public void setBuildDate(long buildDate) { this.buildDate = buildDate; } /** * Returns the build date of the data. * @return The date. */ public long getBuildDate() { return buildDate; } /** * From a provided state, sets the max CN of the monitor data. * @param state the provided state. */ opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -65,12 +67,14 @@ import org.opends.server.types.BackupConfig; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.RestoreConfig; import org.opends.server.types.ResultCode; import org.opends.server.util.LDIFReader; import org.opends.server.util.TimeThread; import com.sleepycat.je.DatabaseException; @@ -1094,4 +1098,115 @@ { return replicationPort; } // TODO: Remote monitor data cache lifetime is 500ms/should be configurable private long monitorDataLifeTime = 500; /* The date of the last time they have been elaborated */ private long monitorDataLastBuildDate = 0; /* Search op on monitor data is processed by a worker thread. * Requests are sent to the other RS,and responses are received by the * listener threads. * The worker thread is awoke on this semaphore, or on timeout. */ Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0); /** * Trigger the computation of the Global Monitoring Data. * This should be called by all the MonitorProviders that need * the global monitoring data to be updated before they can * publish their information to cn=monitor. * * This method will trigger the update of all the global monitoring * information of all the base-DNs of this replication Server. * * @throws DirectoryException If the computation cannot be achieved. */ public void computeMonitorData() throws DirectoryException { if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime()) { if (debugEnabled()) TRACER.debugInfo( "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache"); // The current data are still valid. No need to renew them. return; } remoteMonitorResponsesSemaphore.drainPermits(); int count = 0; for (ReplicationServerDomain domain : baseDNs.values()) { count += domain.initializeMonitorData(); } // Wait for responses waitMonitorDataResponses(count); for (ReplicationServerDomain domain : baseDNs.values()) { domain.completeMonitorData(); } } /** * Wait for the expected count of received MonitorMsg. * @param expectedResponses The number of expected answers. * @throws DirectoryException When an error occurs. */ private void waitMonitorDataResponses(int expectedResponses) throws DirectoryException { try { if (debugEnabled()) TRACER.debugInfo( "In " + getMonitorInstanceName() + " baseDn=" + " waiting for " + expectedResponses + " expected monitor messages"); boolean allPermitsAcquired = remoteMonitorResponsesSemaphore.tryAcquire( expectedResponses, (long) 5000, TimeUnit.MILLISECONDS); if (!allPermitsAcquired) { monitorDataLastBuildDate = TimeThread.getTime(); logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); // let's go on in best effort even with limited data received. } else { monitorDataLastBuildDate = TimeThread.getTime(); if (debugEnabled()) TRACER.debugInfo( "In " + getMonitorInstanceName() + " baseDn=" + " Successfully received all " + expectedResponses + " expected monitor messages"); } } catch (Exception e) { logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); } } /** * This should be called by each ReplicationServerDomain that receives * a response to a monitor request message. */ public void responseReceived() { remoteMonitorResponsesSemaphore.release(); } /** * This should be called when the Monitoring has failed and the * Worker thread that is waiting for the result should be awaken. */ public void responseReceivedAll() { remoteMonitorResponsesSemaphore.notifyAll(); } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.Iterator; @@ -65,7 +64,6 @@ import org.opends.server.types.Attributes; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import org.opends.server.util.TimeThread; import com.sleepycat.je.DatabaseException; import java.util.Timer; import java.util.TimerTask; @@ -146,20 +144,12 @@ /* Monitor data management */ // TODO: Remote monitor data cache lifetime is 500ms/should be configurable private long monitorDataLifeTime = 500; /* Search op on monitor data is processed by a worker thread. * Requests are sent to the other RS,and responses are received by the * listener threads. * The worker thread is awoke on this semaphore, or on timeout. */ Semaphore remoteMonitorResponsesSemaphore; /** * The monitor data consolidated over the topology. */ private MonitorData monitorData = new MonitorData(); private MonitorData wrkMonitorData; private Object monitorDataLock = new Object(); /** * The needed info for each received assured update message we are waiting @@ -2255,19 +2245,26 @@ synchronized protected MonitorData computeMonitorData() throws DirectoryException { if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime()) { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " getRemoteMonitorData in cache"); // The current data are still valid. No need to renew them. // Update the monitorData of all domains if this was necessary. replicationServer.computeMonitorData(); return monitorData; } wrkMonitorData = new MonitorData(); synchronized (wrkMonitorData) /** * Start collecting global monitoring information for this * ReplicationServerDomain. * * @return The number of response that should come back. * * @throws DirectoryException In case the monitoring information could * not be collected. */ int initializeMonitorData() throws DirectoryException { synchronized (monitorDataLock) { wrkMonitorData = new MonitorData(); if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + @@ -2324,28 +2321,20 @@ wrkMonitorData.toString()); } // Send Request to the other Replication Servers if (remoteMonitorResponsesSemaphore == null) { remoteMonitorResponsesSemaphore = new Semaphore(0); short requestCnt = sendMonitorDataRequest(); // Wait reponses from them or timeout waitMonitorDataResponses(requestCnt); } else { // The processing of renewing the monitor cache is already running // We'll make it sleeping until the end // TODO: unit test for this case. while (remoteMonitorResponsesSemaphore != null) { waitMonitorDataResponses(1); } // Send the request for remote monitor data to the return sendMonitorDataRequest(); } /** * Complete all the calculation when all monitoring information * has been received. */ void completeMonitorData() { wrkMonitorData.completeComputing(); // Store the new computed data as the reference synchronized (monitorData) synchronized (monitorDataLock) { // Now we have the expected answers or an error occurred monitorData = wrkMonitorData; @@ -2356,7 +2345,6 @@ " baseDn=" + baseDn + " *** Computed MonitorData: " + monitorData.toString()); } return monitorData; } /** @@ -2364,10 +2352,10 @@ * @return the number of requests sent. * @throws DirectoryException when a problem occurs. */ protected short sendMonitorDataRequest() protected int sendMonitorDataRequest() throws DirectoryException { short sent = 0; int sent = 0; try { for (ServerHandler rs : replicationServers.values()) @@ -2389,49 +2377,6 @@ } /** * Wait for the expected count of received MonitorMsg. * @param expectedResponses The number of expected answers. * @throws DirectoryException When an error occurs. */ protected void waitMonitorDataResponses(int expectedResponses) throws DirectoryException { try { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " waiting for " + expectedResponses + " expected monitor messages"); boolean allPermitsAcquired = remoteMonitorResponsesSemaphore.tryAcquire( expectedResponses, (long) 5000, TimeUnit.MILLISECONDS); if (!allPermitsAcquired) { logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); // let's go on in best effort even with limited data received. } else { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " Successfully received all " + expectedResponses + " expected monitor messages"); } } catch (Exception e) { logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); } finally { remoteMonitorResponsesSemaphore = null; } } /** * Processes a Monitor message receives from a remote Replication Server * and stores the data received. * @@ -2442,23 +2387,20 @@ if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "Receiving " + msg + " from " + msg.getsenderID() + remoteMonitorResponsesSemaphore); "Receiving " + msg + " from " + msg.getsenderID()); if (remoteMonitorResponsesSemaphore == null) try { // Let's ignore the remote monitor data just received // since the computing processing has been ended. // An error - probably a timemout - occurred that was already logged synchronized (monitorDataLock) { if (wrkMonitorData == null) { // This is a response for an earlier request whose computing is // already complete. logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get( Short.toString(msg.getsenderID()))); return; } try { synchronized (wrkMonitorData) { // Here is the RS state : list <serverID, lastChangeNumber> // For each LDAP Server, we keep the max CN across the RSes ServerState replServerState = msg.getReplServerDbState(); @@ -2523,7 +2465,7 @@ // Decreases the number of expected responses and potentially // wakes up the waiting requestor thread. remoteMonitorResponsesSemaphore.release(); replicationServer.responseReceived(); } catch (Exception e) { @@ -2532,7 +2474,7 @@ // If an exception occurs while processing one of the expected message, // the processing is aborted and the waiting thread is awoke. remoteMonitorResponsesSemaphore.notifyAll(); replicationServer.responseReceivedAll(); } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -2064,6 +2064,18 @@ long delay = md.getApproxDelay(serverId); attributes.add(Attributes.create("approximate-delay", String .valueOf(delay))); /* get the Server State */ AttributeBuilder builder = new AttributeBuilder("server-state"); ServerState state = md.getLDAPServerState(serverId); if (state != null) { for (String str : state.toStringSet()) { builder.add(str); } attributes.add(builder.toAttribute()); } } else { @@ -2071,6 +2083,18 @@ long missingChanges = md.getMissingChangesRS(serverId); attributes.add(Attributes.create("missing-changes", String .valueOf(missingChanges))); /* get the Server State */ AttributeBuilder builder = new AttributeBuilder("server-state"); ServerState state = md.getRSStates(serverId); if (state != null) { for (String str : state.toStringSet()) { builder.add(str); } attributes.add(builder.toAttribute()); } } } catch (Exception e) @@ -2131,14 +2155,6 @@ attributes.add(Attributes.create("current-rcv-window", String .valueOf(rcvWindow))); /* get the Server State */ AttributeBuilder builder = new AttributeBuilder("server-state"); for (String str : serverState.toStringSet()) { builder.add(str); } attributes.add(builder.toAttribute()); // Encryption attributes.add(Attributes.create("ssl-encryption", String .valueOf(session.isEncrypted()))); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -175,7 +175,7 @@ ServerState state2 = states1.get(domain2ServerId); assertNotNull(state2, "getReplicaStates is not showing DS2"); Map<Short, ServerState> states2 = domain1.getReplicaStates(); Map<Short, ServerState> states2 = domain2.getReplicaStates(); ServerState state1 = states2.get(domain1ServerId); assertNotNull(state1, "getReplicaStates is not showing DS1");