mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
26.57.2016 0e5de1f566819e9bdbf2dc0e654e16fb6a04a79b
ReplicationDomainMonitorData: corrected code for multi-threading case

ReplicationDomainMonitorData.java:
Extracted methods setMaxCsn0() and setFirstMissingDate0() to ensure correct behaviour in multi-threaded context.
In toString(), added the baseDN (added as a parameter of the constructor too).

Various code cleanups
3 files modified
154 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java 80 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java 70 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -23,85 +23,59 @@
import java.util.concurrent.TimeUnit;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.util.TimeThread;
import net.jcip.annotations.GuardedBy;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class maintains monitor data for a replication domain.
 */
/** This class maintains monitor data for a replication domain. */
class ReplicationDomainMonitor
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** TODO: Remote monitor data cache lifetime is 500ms/should be configurable. */
  private final long monitorDataLifeTime = 500;
  /** The replication domain monitored by this class. */
  private final ReplicationServerDomain domain;
  /** The monitor data consolidated over the topology. */
  private volatile ReplicationDomainMonitorData monitorData;
  /**
   * The monitor data consolidated over the topology.
   */
  private volatile ReplicationDomainMonitorData monitorData =
      new ReplicationDomainMonitorData();
  /**
   * This lock guards against multiple concurrent monitor data recalculation.
   */
  /** This lock guards against multiple concurrent monitor data recalculation. */
  private final Object pendingMonitorLock = new Object();
  /** Guarded by pendingMonitorLock. */
  @GuardedBy("pendingMonitorLock")
  private long monitorDataLastBuildDate;
  /**
   * The set of replication servers which are already known to be slow to send
   * monitor data.
   * <p>
   * Guarded by pendingMonitorLock.
   */
  private final Set<Integer> monitorDataLateServers = new HashSet<>();
  /** This lock serializes updates to the pending monitor data. */
  private final Object pendingMonitorDataLock = new Object();
  /**
   * Monitor data which is currently being calculated.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  /** Monitor data which is currently being calculated. */
  @GuardedBy("pendingMonitorDataLock")
  private ReplicationDomainMonitorData pendingMonitorData;
  /** The set of replication servers which are already known to be slow to send monitor data. */
  @GuardedBy("pendingMonitorDataLock")
  private final Set<Integer> monitorDataLateServers = new HashSet<>();
  /**
   * A set containing the IDs of servers from which we are currently expecting
   * monitor responses. When a response is received from a server we remove the
   * ID from this table, and count down the latch if the ID was in the table.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  @GuardedBy("pendingMonitorDataLock")
  private final Set<Integer> pendingMonitorDataServerIDs = new HashSet<>();
  /**
   * This latch is non-null and is used in order to count incoming responses as
   * they arrive. Since incoming response may arrive at any time, even when
   * there is no pending monitor request, access to the latch must be guarded.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  @GuardedBy("pendingMonitorDataLock")
  private CountDownLatch pendingMonitorDataLatch;
  /**
   * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
   */
  private final long monitorDataLifeTime = 500;
  /** The replication domain monitored by this class. */
  private final ReplicationServerDomain domain;
  /**
   * Builds an object of this class.
   *
   * @param replicationDomain
@@ -110,6 +84,12 @@
  public ReplicationDomainMonitor(ReplicationServerDomain replicationDomain)
  {
    this.domain = replicationDomain;
    this.monitorData = new ReplicationDomainMonitorData(getBaseDn());
  }
  private DN getBaseDn()
  {
    return domain.getBaseDN();
  }
  /**
@@ -149,7 +129,7 @@
          {
            // Clear the pending monitor data.
            pendingMonitorDataServerIDs.clear();
            pendingMonitorData = new ReplicationDomainMonitorData();
            pendingMonitorData = new ReplicationDomainMonitorData(baseDN);
            initializePendingMonitorData();
@@ -239,9 +219,7 @@
    return monitorData;
  }
  /**
   * Start collecting global monitoring information for the replication domain.
   */
  /** Start collecting global monitoring information for the replication domain. */
  private void initializePendingMonitorData()
  {
    // Let's process our directly connected DS
@@ -266,8 +244,7 @@
      }
      pendingMonitorData.setMaxCSN(maxCSN);
      pendingMonitorData.setLDAPServerState(serverId, dsState);
      pendingMonitorData.setFirstMissingDate(serverId,
          ds.getApproxFirstMissingDate());
      pendingMonitorData.setFirstMissingDate(serverId, ds.getApproxFirstMissingDate());
    }
    // Then initialize the max CSN for the LS that produced something
@@ -369,5 +346,4 @@
      }
    }
  }
}
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationDomainMonitorData.java
@@ -12,7 +12,7 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2009-2010 Sun Microsystems, Inc.
 * Portions Copyright 2012-2015 ForgeRock AS.
 * Portions Copyright 2012-2016 ForgeRock AS.
 */
package org.opends.server.replication.server;
@@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentMap;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.util.TimeThread;
@@ -44,17 +45,29 @@
   *   date of the first missing change.
   */
  /** For each LDAP server, its server state. */
  private ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>();
  /** A Map containing the ServerStates of each RS. */
  private ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>();
  /** For each LDAP server, the last(max) CSN it published. */
  private ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>();
  /** BaseDN being monitored. This field is only used for debugging purposes. */
  private final DN baseDN;
  /** For each LDAP server, its server state. This is the point-of-view of the DSs. */
  private final ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>();
  /** A Map containing the ServerStates of each RS. This is the point-of-view of the RSs. */
  private final ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>();
  /**
   * For each LDAP server, the last(max) CSN it published.
   * <p>
   * Union of the view from all the {@code ldapStates} and {@code rsStates}.
   */
  private final ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>();
  /** For each LDAP server, an approximation of the date of the first missing change. */
  private ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>();
  private ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>();
  private ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>();
  private final ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>();
  private final ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>();
  private final ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>();
  public ReplicationDomainMonitorData(DN baseDN)
  {
    this.baseDN = baseDN;
  }
  /**
   * Get an approximation of the latency delay of the replication.
@@ -193,15 +206,10 @@
    return lsiMissingChanges;
  }
  /**
   * Returns a <code>String</code> object representing this
   * object's value.
   * @return  a string representation of the value of this object in
   */
  @Override
  public String toString()
  {
    StringBuilder mds = new StringBuilder("Monitor data=\n");
    StringBuilder mds = new StringBuilder("Monitor data='").append(baseDN).append("'\n");
    // maxCSNs
    for (Entry<Integer, CSN> entry : maxCSNs.entrySet())
@@ -256,22 +264,28 @@
   */
  public void setMaxCSN(CSN newCSN)
  {
    if (newCSN == null)
    if (newCSN != null)
    {
      return;
      while (!setMaxCsn0(newCSN))
      {
        // try setting up the max CSN until the CSN is no longer the max one, or until it succeeds
      }
    }
    }
  private boolean setMaxCsn0(CSN newCSN)
  {
    int serverId = newCSN.getServerId();
    CSN currentMaxCSN = maxCSNs.get(serverId);
    if (currentMaxCSN == null)
    {
      maxCSNs.put(serverId, newCSN);
      return maxCSNs.putIfAbsent(serverId, newCSN) == null;
    }
    else if (newCSN.isNewerThan(currentMaxCSN))
    {
      // TODO JNR should we check for unsuccessful replace?
      maxCSNs.replace(serverId, newCSN);
      return maxCSNs.replace(serverId, currentMaxCSN, newCSN);
    }
    return true;
  }
  /**
@@ -312,15 +326,25 @@
   */
  public void setFirstMissingDate(int serverId, long newFmd)
  {
    while (!setFirstMissingDate0(serverId, newFmd))
    {
      // try setting up the first missing date
      // until the first missing date is no longer the min one, or until it succeeds
    }
  }
  public boolean setFirstMissingDate0(int serverId, long newFmd)
  {
    Long currentFmd = firstMissingDates.get(serverId);
    if (currentFmd == null)
    {
      firstMissingDates.put(serverId, newFmd);
      return firstMissingDates.putIfAbsent(serverId, newFmd) == null;
    }
    else if (newFmd != 0 && (newFmd < currentFmd || currentFmd == 0))
    {
      firstMissingDates.replace(serverId, newFmd);
      return firstMissingDates.replace(serverId, currentFmd, newFmd);
    }
    return true;
  }
  /**
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -842,6 +842,10 @@
        else if (msg instanceof UpdateMsg)
        {
          update = (UpdateMsg) msg;
          // If the replica is reset to an older state (server died, reset from a backup of day-1),
          // then its generator state must be adjusted back to what it was before.
          // Scary: what happens if the DS starts accepting updates
          // before the recovery is finished?
          generator.adjust(update.getCSN());
        }
        else if (msg instanceof InitializeRcvAckMsg)