From 0e5de1f566819e9bdbf2dc0e654e16fb6a04a79b Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 08 Aug 2016 08:05:17 +0000
Subject: [PATCH] ReplicationDomainMonitorData: corrected code for multi-threading case

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java     |   80 +++++++++-----------------
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java |   70 +++++++++++++++-------
 opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java           |    4 +
 3 files changed, 79 insertions(+), 75 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
index 750e445..636cdec 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -23,85 +23,59 @@
 import java.util.concurrent.TimeUnit;
 
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.DN;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.MonitorRequestMsg;
-import org.forgerock.opendj.ldap.DN;
 import org.opends.server.util.TimeThread;
 
+import net.jcip.annotations.GuardedBy;
+
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.*;
 
-/**
- * This class maintains monitor data for a replication domain.
- */
+/** This class maintains monitor data for a replication domain. */
 class ReplicationDomainMonitor
 {
-
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
+  /** TODO: Remote monitor data cache lifetime is 500ms/should be configurable. */
+  private final long monitorDataLifeTime = 500;
+  /** The replication domain monitored by this class. */
+  private final ReplicationServerDomain domain;
+  /** The monitor data consolidated over the topology. */
+  private volatile ReplicationDomainMonitorData monitorData;
 
-  /**
-   * The monitor data consolidated over the topology.
-   */
-  private volatile ReplicationDomainMonitorData monitorData =
-      new ReplicationDomainMonitorData();
-
-  /**
-   * This lock guards against multiple concurrent monitor data recalculation.
-   */
+  /** This lock guards against multiple concurrent monitor data recalculation. */
   private final Object pendingMonitorLock = new Object();
-
-  /** Guarded by pendingMonitorLock. */
+  @GuardedBy("pendingMonitorLock")
   private long monitorDataLastBuildDate;
 
-  /**
-   * The set of replication servers which are already known to be slow to send
-   * monitor data.
-   * <p>
-   * Guarded by pendingMonitorLock.
-   */
-  private final Set<Integer> monitorDataLateServers = new HashSet<>();
-
   /** This lock serializes updates to the pending monitor data. */
   private final Object pendingMonitorDataLock = new Object();
-
-  /**
-   * Monitor data which is currently being calculated.
-   * <p>
-   * Guarded by pendingMonitorDataLock.
-   */
+  /** Monitor data which is currently being calculated. */
+  @GuardedBy("pendingMonitorDataLock")
   private ReplicationDomainMonitorData pendingMonitorData;
-
+  /** The set of replication servers which are already known to be slow to send monitor data. */
+  @GuardedBy("pendingMonitorDataLock")
+  private final Set<Integer> monitorDataLateServers = new HashSet<>();
   /**
    * A set containing the IDs of servers from which we are currently expecting
    * monitor responses. When a response is received from a server we remove the
    * ID from this table, and count down the latch if the ID was in the table.
-   * <p>
-   * Guarded by pendingMonitorDataLock.
    */
+  @GuardedBy("pendingMonitorDataLock")
   private final Set<Integer> pendingMonitorDataServerIDs = new HashSet<>();
-
   /**
    * This latch is non-null and is used in order to count incoming responses as
    * they arrive. Since incoming response may arrive at any time, even when
    * there is no pending monitor request, access to the latch must be guarded.
-   * <p>
-   * Guarded by pendingMonitorDataLock.
    */
+  @GuardedBy("pendingMonitorDataLock")
   private CountDownLatch pendingMonitorDataLatch;
 
   /**
-   * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
-   */
-  private final long monitorDataLifeTime = 500;
-
-  /** The replication domain monitored by this class. */
-  private final ReplicationServerDomain domain;
-
-
-  /**
    * Builds an object of this class.
    *
    * @param replicationDomain
@@ -110,6 +84,12 @@
   public ReplicationDomainMonitor(ReplicationServerDomain replicationDomain)
   {
     this.domain = replicationDomain;
+    this.monitorData = new ReplicationDomainMonitorData(getBaseDn());
+  }
+
+  private DN getBaseDn()
+  {
+    return domain.getBaseDN();
   }
 
   /**
@@ -149,7 +129,7 @@
           {
             // Clear the pending monitor data.
             pendingMonitorDataServerIDs.clear();
-            pendingMonitorData = new ReplicationDomainMonitorData();
+            pendingMonitorData = new ReplicationDomainMonitorData(baseDN);
 
             initializePendingMonitorData();
 
@@ -239,9 +219,7 @@
     return monitorData;
   }
 
-  /**
-   * Start collecting global monitoring information for the replication domain.
-   */
+  /** Start collecting global monitoring information for the replication domain. */
   private void initializePendingMonitorData()
   {
     // Let's process our directly connected DS
@@ -266,8 +244,7 @@
       }
       pendingMonitorData.setMaxCSN(maxCSN);
       pendingMonitorData.setLDAPServerState(serverId, dsState);
-      pendingMonitorData.setFirstMissingDate(serverId,
-          ds.getApproxFirstMissingDate());
+      pendingMonitorData.setFirstMissingDate(serverId, ds.getApproxFirstMissingDate());
     }
 
     // Then initialize the max CSN for the LS that produced something
@@ -369,5 +346,4 @@
       }
     }
   }
-
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
index a854678..f479b07 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2012-2015 ForgeRock AS.
+ * Portions Copyright 2012-2016 ForgeRock AS.
  */
 package org.opends.server.replication.server;
 
@@ -22,6 +22,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.DN;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.util.TimeThread;
@@ -44,17 +45,29 @@
    *   date of the first missing change.
    */
 
-  /** For each LDAP server, its server state. */
-  private ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>();
-  /** A Map containing the ServerStates of each RS. */
-  private ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>();
-  /** For each LDAP server, the last(max) CSN it published. */
-  private ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>();
+  /** BaseDN being monitored. This field is only used for debugging purposes. */
+  private final DN baseDN;
+
+  /** For each LDAP server, its server state. This is the point-of-view of the DSs. */
+  private final ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>();
+  /** A Map containing the ServerStates of each RS. This is the point-of-view of the RSs. */
+  private final ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>();
+  /**
+   * For each LDAP server, the last(max) CSN it published.
+   * <p>
+   * Union of the view from all the {@code ldapStates} and {@code rsStates}.
+   */
+  private final ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>();
 
   /** For each LDAP server, an approximation of the date of the first missing change. */
-  private ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>();
-  private ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>();
-  private ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>();
+
+  public ReplicationDomainMonitorData(DN baseDN)
+  {
+    this.baseDN = baseDN;
+  }
 
   /**
    * Get an approximation of the latency delay of the replication.
@@ -193,15 +206,10 @@
     return lsiMissingChanges;
   }
 
-  /**
-   * Returns a <code>String</code> object representing this
-   * object's value.
-   * @return  a string representation of the value of this object in
-   */
   @Override
   public String toString()
   {
-    StringBuilder mds = new StringBuilder("Monitor data=\n");
+    StringBuilder mds = new StringBuilder("Monitor data='").append(baseDN).append("'\n");
 
     // maxCSNs
     for (Entry<Integer, CSN> entry : maxCSNs.entrySet())
@@ -256,22 +264,28 @@
    */
   public void setMaxCSN(CSN newCSN)
   {
-    if (newCSN == null)
+    if (newCSN != null)
     {
-      return;
+      while (!setMaxCsn0(newCSN))
+      {
+        // try setting up the max CSN until the CSN is no longer the max one, or until it succeeds
+      }
     }
+  }
 
+  private boolean setMaxCsn0(CSN newCSN)
+  {
     int serverId = newCSN.getServerId();
     CSN currentMaxCSN = maxCSNs.get(serverId);
     if (currentMaxCSN == null)
     {
-      maxCSNs.put(serverId, newCSN);
+      return maxCSNs.putIfAbsent(serverId, newCSN) == null;
     }
     else if (newCSN.isNewerThan(currentMaxCSN))
     {
-      // TODO JNR should we check for unsuccessful replace?
-      maxCSNs.replace(serverId, newCSN);
+      return maxCSNs.replace(serverId, currentMaxCSN, newCSN);
     }
+    return true;
   }
 
   /**
@@ -312,15 +326,25 @@
    */
   public void setFirstMissingDate(int serverId, long newFmd)
   {
+    while (!setFirstMissingDate0(serverId, newFmd))
+    {
+      // try setting up the first missing date
+      // until the first missing date is no longer the min one, or until it succeeds
+    }
+  }
+
+  public boolean setFirstMissingDate0(int serverId, long newFmd)
+  {
     Long currentFmd = firstMissingDates.get(serverId);
     if (currentFmd == null)
     {
-      firstMissingDates.put(serverId, newFmd);
+      return firstMissingDates.putIfAbsent(serverId, newFmd) == null;
     }
     else if (newFmd != 0 && (newFmd < currentFmd || currentFmd == 0))
     {
-      firstMissingDates.replace(serverId, newFmd);
+      return firstMissingDates.replace(serverId, currentFmd, newFmd);
     }
+    return true;
   }
 
   /**
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
index 688cc98..3e52dcd 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -842,6 +842,10 @@
         else if (msg instanceof UpdateMsg)
         {
           update = (UpdateMsg) msg;
+          // If the replica is reset to an older state (server died, reset from a backup of day-1),
+          // then its generator state must be adjusted back to what it was before.
+          // Scary: what happens if the DS starts accepting updates
+          // before the recovery is finished?
           generator.adjust(update.getCSN());
         }
         else if (msg instanceof InitializeRcvAckMsg)

--
Gitblit v1.10.0