opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -23,85 +23,59 @@ import java.util.concurrent.TimeUnit; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.ldap.DN; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.forgerock.opendj.ldap.DN; import org.opends.server.util.TimeThread; import net.jcip.annotations.GuardedBy; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.*; /** * This class maintains monitor data for a replication domain. */ /** This class maintains monitor data for a replication domain. */ class ReplicationDomainMonitor { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); /** TODO: Remote monitor data cache lifetime is 500ms/should be configurable. */ private final long monitorDataLifeTime = 500; /** The replication domain monitored by this class. */ private final ReplicationServerDomain domain; /** The monitor data consolidated over the topology. */ private volatile ReplicationDomainMonitorData monitorData; /** * The monitor data consolidated over the topology. */ private volatile ReplicationDomainMonitorData monitorData = new ReplicationDomainMonitorData(); /** * This lock guards against multiple concurrent monitor data recalculation. */ /** This lock guards against multiple concurrent monitor data recalculation. */ private final Object pendingMonitorLock = new Object(); /** Guarded by pendingMonitorLock. */ @GuardedBy("pendingMonitorLock") private long monitorDataLastBuildDate; /** * The set of replication servers which are already known to be slow to send * monitor data. * <p> * Guarded by pendingMonitorLock. */ private final Set<Integer> monitorDataLateServers = new HashSet<>(); /** This lock serializes updates to the pending monitor data. */ private final Object pendingMonitorDataLock = new Object(); /** * Monitor data which is currently being calculated. * <p> * Guarded by pendingMonitorDataLock. */ /** Monitor data which is currently being calculated. */ @GuardedBy("pendingMonitorDataLock") private ReplicationDomainMonitorData pendingMonitorData; /** The set of replication servers which are already known to be slow to send monitor data. */ @GuardedBy("pendingMonitorDataLock") private final Set<Integer> monitorDataLateServers = new HashSet<>(); /** * A set containing the IDs of servers from which we are currently expecting * monitor responses. When a response is received from a server we remove the * ID from this table, and count down the latch if the ID was in the table. * <p> * Guarded by pendingMonitorDataLock. */ @GuardedBy("pendingMonitorDataLock") private final Set<Integer> pendingMonitorDataServerIDs = new HashSet<>(); /** * This latch is non-null and is used in order to count incoming responses as * they arrive. Since incoming response may arrive at any time, even when * there is no pending monitor request, access to the latch must be guarded. * <p> * Guarded by pendingMonitorDataLock. */ @GuardedBy("pendingMonitorDataLock") private CountDownLatch pendingMonitorDataLatch; /** * TODO: Remote monitor data cache lifetime is 500ms/should be configurable. */ private final long monitorDataLifeTime = 500; /** The replication domain monitored by this class. */ private final ReplicationServerDomain domain; /** * Builds an object of this class. * * @param replicationDomain @@ -110,6 +84,12 @@ public ReplicationDomainMonitor(ReplicationServerDomain replicationDomain) { this.domain = replicationDomain; this.monitorData = new ReplicationDomainMonitorData(getBaseDn()); } private DN getBaseDn() { return domain.getBaseDN(); } /** @@ -149,7 +129,7 @@ { // Clear the pending monitor data. pendingMonitorDataServerIDs.clear(); pendingMonitorData = new ReplicationDomainMonitorData(); pendingMonitorData = new ReplicationDomainMonitorData(baseDN); initializePendingMonitorData(); @@ -239,9 +219,7 @@ return monitorData; } /** * Start collecting global monitoring information for the replication domain. */ /** Start collecting global monitoring information for the replication domain. */ private void initializePendingMonitorData() { // Let's process our directly connected DS @@ -266,8 +244,7 @@ } pendingMonitorData.setMaxCSN(maxCSN); pendingMonitorData.setLDAPServerState(serverId, dsState); pendingMonitorData.setFirstMissingDate(serverId, ds.getApproxFirstMissingDate()); pendingMonitorData.setFirstMissingDate(serverId, ds.getApproxFirstMissingDate()); } // Then initialize the max CSN for the LS that produced something @@ -369,5 +346,4 @@ } } } } opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
@@ -12,7 +12,7 @@ * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2012-2015 ForgeRock AS. * Portions Copyright 2012-2016 ForgeRock AS. */ package org.opends.server.replication.server; @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.ldap.DN; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.util.TimeThread; @@ -44,17 +45,29 @@ * date of the first missing change. */ /** For each LDAP server, its server state. */ private ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>(); /** A Map containing the ServerStates of each RS. */ private ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>(); /** For each LDAP server, the last(max) CSN it published. */ private ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>(); /** BaseDN being monitored. This field is only used for debugging purposes. */ private final DN baseDN; /** For each LDAP server, its server state. This is the point-of-view of the DSs. */ private final ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>(); /** A Map containing the ServerStates of each RS. This is the point-of-view of the RSs. */ private final ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>(); /** * For each LDAP server, the last(max) CSN it published. * <p> * Union of the view from all the {@code ldapStates} and {@code rsStates}. */ private final ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>(); /** For each LDAP server, an approximation of the date of the first missing change. */ private ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>(); private ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>(); private ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>(); public ReplicationDomainMonitorData(DN baseDN) { this.baseDN = baseDN; } /** * Get an approximation of the latency delay of the replication. @@ -193,15 +206,10 @@ return lsiMissingChanges; } /** * Returns a <code>String</code> object representing this * object's value. * @return a string representation of the value of this object in */ @Override public String toString() { StringBuilder mds = new StringBuilder("Monitor data=\n"); StringBuilder mds = new StringBuilder("Monitor data='").append(baseDN).append("'\n"); // maxCSNs for (Entry<Integer, CSN> entry : maxCSNs.entrySet()) @@ -256,22 +264,28 @@ */ public void setMaxCSN(CSN newCSN) { if (newCSN == null) if (newCSN != null) { return; while (!setMaxCsn0(newCSN)) { // try setting up the max CSN until the CSN is no longer the max one, or until it succeeds } } } private boolean setMaxCsn0(CSN newCSN) { int serverId = newCSN.getServerId(); CSN currentMaxCSN = maxCSNs.get(serverId); if (currentMaxCSN == null) { maxCSNs.put(serverId, newCSN); return maxCSNs.putIfAbsent(serverId, newCSN) == null; } else if (newCSN.isNewerThan(currentMaxCSN)) { // TODO JNR should we check for unsuccessful replace? maxCSNs.replace(serverId, newCSN); return maxCSNs.replace(serverId, currentMaxCSN, newCSN); } return true; } /** @@ -312,15 +326,25 @@ */ public void setFirstMissingDate(int serverId, long newFmd) { while (!setFirstMissingDate0(serverId, newFmd)) { // try setting up the first missing date // until the first missing date is no longer the min one, or until it succeeds } } public boolean setFirstMissingDate0(int serverId, long newFmd) { Long currentFmd = firstMissingDates.get(serverId); if (currentFmd == null) { firstMissingDates.put(serverId, newFmd); return firstMissingDates.putIfAbsent(serverId, newFmd) == null; } else if (newFmd != 0 && (newFmd < currentFmd || currentFmd == 0)) { firstMissingDates.replace(serverId, newFmd); return firstMissingDates.replace(serverId, currentFmd, newFmd); } return true; } /** opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -842,6 +842,10 @@ else if (msg instanceof UpdateMsg) { update = (UpdateMsg) msg; // If the replica is reset to an older state (server died, reset from a backup of day-1), // then its generator state must be adjusted back to what it was before. // Scary: what happens if the DS starts accepting updates // before the recovery is finished? generator.adjust(update.getCSN()); } else if (msg instanceof InitializeRcvAckMsg)