From 0e5de1f566819e9bdbf2dc0e654e16fb6a04a79b Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 08 Aug 2016 08:05:17 +0000
Subject: [PATCH] ReplicationDomainMonitorData: corrected code for multi-threading case
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java | 80 +++++++++-----------------
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java | 70 +++++++++++++++-------
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java | 4 +
3 files changed, 79 insertions(+), 75 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
index 750e445..636cdec 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -23,85 +23,59 @@
import java.util.concurrent.TimeUnit;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.DN;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
-import org.forgerock.opendj.ldap.DN;
import org.opends.server.util.TimeThread;
+import net.jcip.annotations.GuardedBy;
+
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.*;
-/**
- * This class maintains monitor data for a replication domain.
- */
+/** This class maintains monitor data for a replication domain. */
class ReplicationDomainMonitor
{
-
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ /** TODO: Remote monitor data cache lifetime is 500ms/should be configurable. */
+ private final long monitorDataLifeTime = 500;
+ /** The replication domain monitored by this class. */
+ private final ReplicationServerDomain domain;
+ /** The monitor data consolidated over the topology. */
+ private volatile ReplicationDomainMonitorData monitorData;
- /**
- * The monitor data consolidated over the topology.
- */
- private volatile ReplicationDomainMonitorData monitorData =
- new ReplicationDomainMonitorData();
-
- /**
- * This lock guards against multiple concurrent monitor data recalculation.
- */
+ /** This lock guards against multiple concurrent monitor data recalculation. */
private final Object pendingMonitorLock = new Object();
-
- /** Guarded by pendingMonitorLock. */
+ @GuardedBy("pendingMonitorLock")
private long monitorDataLastBuildDate;
- /**
- * The set of replication servers which are already known to be slow to send
- * monitor data.
- * <p>
- * Guarded by pendingMonitorLock.
- */
- private final Set<Integer> monitorDataLateServers = new HashSet<>();
-
/** This lock serializes updates to the pending monitor data. */
private final Object pendingMonitorDataLock = new Object();
-
- /**
- * Monitor data which is currently being calculated.
- * <p>
- * Guarded by pendingMonitorDataLock.
- */
+ /** Monitor data which is currently being calculated. */
+ @GuardedBy("pendingMonitorDataLock")
private ReplicationDomainMonitorData pendingMonitorData;
-
+ /** The set of replication servers which are already known to be slow to send monitor data. */
+ @GuardedBy("pendingMonitorDataLock")
+ private final Set<Integer> monitorDataLateServers = new HashSet<>();
/**
* A set containing the IDs of servers from which we are currently expecting
* monitor responses. When a response is received from a server we remove the
* ID from this table, and count down the latch if the ID was in the table.
- * <p>
- * Guarded by pendingMonitorDataLock.
*/
+ @GuardedBy("pendingMonitorDataLock")
private final Set<Integer> pendingMonitorDataServerIDs = new HashSet<>();
-
/**
* This latch is non-null and is used in order to count incoming responses as
* they arrive. Since incoming response may arrive at any time, even when
* there is no pending monitor request, access to the latch must be guarded.
- * <p>
- * Guarded by pendingMonitorDataLock.
*/
+ @GuardedBy("pendingMonitorDataLock")
private CountDownLatch pendingMonitorDataLatch;
/**
- * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
- */
- private final long monitorDataLifeTime = 500;
-
- /** The replication domain monitored by this class. */
- private final ReplicationServerDomain domain;
-
-
- /**
* Builds an object of this class.
*
* @param replicationDomain
@@ -110,6 +84,12 @@
public ReplicationDomainMonitor(ReplicationServerDomain replicationDomain)
{
this.domain = replicationDomain;
+ this.monitorData = new ReplicationDomainMonitorData(getBaseDn());
+ }
+
+ private DN getBaseDn()
+ {
+ return domain.getBaseDN();
}
/**
@@ -149,7 +129,7 @@
{
// Clear the pending monitor data.
pendingMonitorDataServerIDs.clear();
- pendingMonitorData = new ReplicationDomainMonitorData();
+ pendingMonitorData = new ReplicationDomainMonitorData(baseDN);
initializePendingMonitorData();
@@ -239,9 +219,7 @@
return monitorData;
}
- /**
- * Start collecting global monitoring information for the replication domain.
- */
+ /** Start collecting global monitoring information for the replication domain. */
private void initializePendingMonitorData()
{
// Let's process our directly connected DS
@@ -266,8 +244,7 @@
}
pendingMonitorData.setMaxCSN(maxCSN);
pendingMonitorData.setLDAPServerState(serverId, dsState);
- pendingMonitorData.setFirstMissingDate(serverId,
- ds.getApproxFirstMissingDate());
+ pendingMonitorData.setFirstMissingDate(serverId, ds.getApproxFirstMissingDate());
}
// Then initialize the max CSN for the LS that produced something
@@ -369,5 +346,4 @@
}
}
}
-
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
index a854678..f479b07 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
@@ -12,7 +12,7 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2012-2015 ForgeRock AS.
+ * Portions Copyright 2012-2016 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentMap;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.DN;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.util.TimeThread;
@@ -44,17 +45,29 @@
* date of the first missing change.
*/
- /** For each LDAP server, its server state. */
- private ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>();
- /** A Map containing the ServerStates of each RS. */
- private ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>();
- /** For each LDAP server, the last(max) CSN it published. */
- private ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>();
+ /** BaseDN being monitored. This field is only used for debugging purposes. */
+ private final DN baseDN;
+
+ /** For each LDAP server, its server state. This is the point-of-view of the DSs. */
+ private final ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>();
+ /** A Map containing the ServerStates of each RS. This is the point-of-view of the RSs. */
+ private final ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>();
+ /**
+ * For each LDAP server, the last(max) CSN it published.
+ * <p>
+ * Union of the view from all the {@code ldapStates} and {@code rsStates}.
+ */
+ private final ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>();
/** For each LDAP server, an approximation of the date of the first missing change. */
- private ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>();
- private ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>();
- private ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>();
+
+ public ReplicationDomainMonitorData(DN baseDN)
+ {
+ this.baseDN = baseDN;
+ }
/**
* Get an approximation of the latency delay of the replication.
@@ -193,15 +206,10 @@
return lsiMissingChanges;
}
- /**
- * Returns a <code>String</code> object representing this
- * object's value.
- * @return a string representation of the value of this object in
- */
@Override
public String toString()
{
- StringBuilder mds = new StringBuilder("Monitor data=\n");
+ StringBuilder mds = new StringBuilder("Monitor data='").append(baseDN).append("'\n");
// maxCSNs
for (Entry<Integer, CSN> entry : maxCSNs.entrySet())
@@ -256,22 +264,28 @@
*/
public void setMaxCSN(CSN newCSN)
{
- if (newCSN == null)
+ if (newCSN != null)
{
- return;
+ while (!setMaxCsn0(newCSN))
+ {
+ // try setting up the max CSN until the CSN is no longer the max one, or until it succeeds
+ }
}
+ }
+ private boolean setMaxCsn0(CSN newCSN)
+ {
int serverId = newCSN.getServerId();
CSN currentMaxCSN = maxCSNs.get(serverId);
if (currentMaxCSN == null)
{
- maxCSNs.put(serverId, newCSN);
+ return maxCSNs.putIfAbsent(serverId, newCSN) == null;
}
else if (newCSN.isNewerThan(currentMaxCSN))
{
- // TODO JNR should we check for unsuccessful replace?
- maxCSNs.replace(serverId, newCSN);
+ return maxCSNs.replace(serverId, currentMaxCSN, newCSN);
}
+ return true;
}
/**
@@ -312,15 +326,25 @@
*/
public void setFirstMissingDate(int serverId, long newFmd)
{
+ while (!setFirstMissingDate0(serverId, newFmd))
+ {
+ // try setting up the first missing date
+ // until the first missing date is no longer the min one, or until it succeeds
+ }
+ }
+
+ public boolean setFirstMissingDate0(int serverId, long newFmd)
+ {
Long currentFmd = firstMissingDates.get(serverId);
if (currentFmd == null)
{
- firstMissingDates.put(serverId, newFmd);
+ return firstMissingDates.putIfAbsent(serverId, newFmd) == null;
}
else if (newFmd != 0 && (newFmd < currentFmd || currentFmd == 0))
{
- firstMissingDates.replace(serverId, newFmd);
+ return firstMissingDates.replace(serverId, currentFmd, newFmd);
}
+ return true;
}
/**
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
index 688cc98..3e52dcd 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -842,6 +842,10 @@
else if (msg instanceof UpdateMsg)
{
update = (UpdateMsg) msg;
+ // If the replica is reset to an older state (server died, reset from a backup of day-1),
+ // then its generator state must be adjusted back to what it was before.
+ // Scary: what happens if the DS starts accepting updates
+ // before the recovery is finished?
generator.adjust(update.getCSN());
}
else if (msg instanceof InitializeRcvAckMsg)
--
Gitblit v1.10.0