mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.08.2014 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -173,11 +173,7 @@
  // @NotNull // for the reference
  private final AtomicReference<ConnectedRS> connectedRS =
      new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
  /**
   * Our replication domain.
   * <p>
   * Can be null for unit test purpose.
   */
  /** Our replication domain. */
  private ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
@@ -217,25 +213,14 @@
  /*
   * Properties for the last topology info received from the network.
   */
  /**
   * Info for other DSs.
   * <p>
   * Warning: does not contain info for us (for our server id)
   */
  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
  private volatile long generationID;
  /** Contains the last known state of the replication topology. */
  private final AtomicReference<Topology> topology =
      new AtomicReference<Topology>(new Topology());
  /** <pre>@GuardedBy("this")</pre>. */
  private volatile int updateDoneCount = 0;
  private volatile boolean connectRequiresRecovery = false;
  /**
   * The map of replication server info initialized at connection time and
   * regularly updated. This is used to decide to which best suitable
   * replication server one wants to connect. Key: replication server id Value:
   * replication server info for the matching replication server id
   */
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
  /**
   * This integer defines when the best replication server checking algorithm
   * should be engaged.
   * Every time a monitoring message (each monitoring publisher period) is
@@ -266,19 +251,16 @@
   * @param state The ServerState that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param config The configuration to use.
   * @param generationId The generationId for the server associated to the
   * provided serverId and for the domain associated to the provided baseDN.
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
      ServerState state, ReplicationDomainCfg config, long generationId,
      ServerState state, ReplicationDomainCfg config,
      ReplSessionSecurity replSessionSecurity)
  {
    this.domain = replicationDomain;
    this.state = state;
    this.config = config;
    this.replSessionSecurity = replSessionSecurity;
    this.generationID = generationId;
    this.rcvWindow = getMaxRcvWindow();
    this.halfRcvWindow = rcvWindow / 2;
@@ -352,8 +334,7 @@
   */
  private long getGenerationID()
  {
    generationID = domain.getGenerationID();
    return generationID;
    return domain.getGenerationID();
  }
  /**
@@ -362,38 +343,7 @@
   */
  public void setGenerationID(long generationID)
  {
    this.generationID = generationID;
  }
  /**
   * Sets the locally configured flag for the passed ReplicationServerInfo
   * object, analyzing the local configuration.
   * @param rsInfo the Replication server to check and update
   */
  private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
  {
    // Determine if the passed ReplicationServerInfo has a URL that is present
    // in the locally configured replication servers
    String rsUrl = rsInfo.getServerURL();
    if (rsUrl == null)
    {
      // The ReplicationServerInfo has been generated from a server with
      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
      // ignore this server as we do not know how to connect to it
      rsInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : getReplicationServerUrls())
    {
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
        // This RS is locally configured, mark this
        rsInfo.setLocallyConfigured(true);
        rsInfo.setServerURL(serverUrl);
        return;
      }
    }
    rsInfo.setLocallyConfigured(false);
    domain.setGenerationID(generationID);
  }
  /**
@@ -485,7 +435,7 @@
      // Unsupported message type: should not happen
      throw new IllegalArgumentException("Unexpected PDU type: "
          + msg.getClass().getName() + " :\n" + msg);
          + msg.getClass().getName() + ":\n" + msg);
    }
    /**
@@ -733,8 +683,10 @@
    @Override
    public String toString()
    {
      return "Url:" + getServerURL() + " ServerId:" + getServerId()
          + " GroupId:" + getGroupId();
      return "ReplServerInfo Url:" + getServerURL()
          + " ServerId:" + getServerId()
          + " GroupId:" + getGroupId()
          + " connectedDSs:" + connectedDSs;
    }
  }
@@ -860,9 +812,11 @@
            + "elect the preferred one");
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      Map<Integer, ReplicationServerInfo> rsInfos =
          collectReplicationServersInfo();
      computeNewTopology(toRSInfos(rsInfos));
      if (replicationServerInfos.isEmpty())
      if (rsInfos.isEmpty())
      {
        setConnectedRS(ConnectedRS.noConnectedRS());
      }
@@ -870,7 +824,7 @@
      {
        // At least one server answered, find the best one.
        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
            replicationServerInfos, serverId, getGroupId(), getGenerationID());
            rsInfos, serverId, getGroupId(), getGenerationID());
        // Best found, now initialize connection to this one (handshake phase 1)
        if (logger.isTraceEnabled())
@@ -886,8 +840,7 @@
          Update replication server info with potentially more up to date
          data (server state for instance may have changed)
          */
          replicationServerInfos
              .put(electedRsInfo.getServerId(), electedRsInfo);
          rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
          // Handshake phase 1 exchange went well
@@ -935,10 +888,10 @@
          connectionError = true;
          connectPhaseLock.notify();
          if (replicationServerInfos.size() > 0)
          if (rsInfos.size() > 0)
          {
            logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN.toNormalizedString(),
                Utils.joinAsString(", ", replicationServerInfos.keySet()));
                Utils.joinAsString(", ", rsInfos.keySet()));
          }
          else
          {
@@ -949,6 +902,43 @@
    }
  }
  private void computeNewTopology(List<RSInfo> newRSInfos)
  {
    final int rsServerId = getRsServerId();
    Topology oldTopo;
    Topology newTopo;
    do
    {
      oldTopo = topology.get();
      newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
          rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
    }
    while (!topology.compareAndSet(oldTopo, newTopo));
    if (logger.isTraceEnabled())
    {
      debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
    }
  }
  private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
      Topology newTopo)
  {
    final StringBuilder sb = new StringBuilder();
    sb.append("rsServerId=").append(rsServerId);
    if (newTopo.equals(oldTopo))
    {
      sb.append(", unchangedTopology=").append(newTopo);
    }
    else
    {
      sb.append(", oldTopology=").append(oldTopo);
      sb.append(", newTopology=").append(newTopo);
    }
    return sb;
  }
  /**
   * Connects to a replication server.
   *
@@ -2303,7 +2293,7 @@
    if (logger.isTraceEnabled())
    {
      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
          + rs.getServerId() + ") genId=" + generationID);
          + rs.getServerId() + ") genId=" + getGenerationID());
    }
  }
@@ -2408,7 +2398,8 @@
          */
          credit =
            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
        } else
        }
        else
        {
          credit = true;
        }
@@ -2451,6 +2442,11 @@
      }
      catch (IOException e)
      {
        if (logger.isTraceEnabled())
        {
          debugInfo("publish(): IOException caught: "
              + stackTraceToSingleLineString(e));
        }
        if (!retryOnFailure)
        {
          return false;
@@ -2463,23 +2459,24 @@
          try
          {
            connectPhaseLock.wait(100);
          } catch (InterruptedException e1)
          }
          catch (InterruptedException ignored)
          {
            // ignore
            if (logger.isTraceEnabled())
            {
              debugInfo("publish(): Interrupted exception raised : "
                  + e.getLocalizedMessage());
              debugInfo("publish(): InterruptedException caught 1: "
                  + stackTraceToSingleLineString(ignored));
            }
          }
        }
      } catch (InterruptedException e)
      }
      catch (InterruptedException ignored)
      {
        // just loop.
        if (logger.isTraceEnabled())
        {
          debugInfo("publish(): Interrupted exception raised."
              + e.getLocalizedMessage());
          debugInfo("publish(): InterruptedException caught 2: "
              + stackTraceToSingleLineString(ignored));
        }
      }
    }
@@ -2607,9 +2604,10 @@
          }
          // Update the replication servers ServerStates with new received info
          Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
          for (int srvId : toIterable(monitorMsg.rsIterator()))
          {
            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
            final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
            if (rsInfo != null)
            {
              rsInfo.update(monitorMsg.getRSServerState(srvId));
@@ -2629,9 +2627,9 @@
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              final RSEvaluations evals =
                  computeBestReplicationServer(false, previousRsServerID, state,
                  replicationServerInfos, serverId, getGroupId(), generationID);
              final RSEvaluations evals = computeBestReplicationServer(
                  false, previousRsServerID, state,
                  rsInfos, serverId, getGroupId(), getGenerationID());
              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
              if (previousRsServerID != -1
                  && (bestServerInfo == null
@@ -2951,9 +2949,9 @@
   * Gets the info for DSs in the topology (except us).
   * @return The info for DSs in the topology (except us)
   */
  public List<DSInfo> getDsList()
  public Map<Integer, DSInfo> getReplicaInfos()
  {
    return dsList;
    return topology.get().replicaInfos;
  }
  /**
@@ -2962,10 +2960,15 @@
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  public List<RSInfo> getRsInfos()
  {
    return toRSInfos(topology.get().rsInfos);
  }
  private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
  {
    final List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
    for (ReplicationServerInfo rsInfo : rsInfos.values())
    {
      result.add(rsInfo.toRSInfo());
    }
@@ -2973,39 +2976,6 @@
  }
  /**
   * Computes the list of DSs connected to a particular RS.
   * @param rsId The RS id of the server one wants to know the connected DSs
   * @param dsList The list of DSinfo from which to compute things
   * @param rsServerId the serverId to use for the connectedDS
   * @return The list of connected DSs to the server rsId
   */
  private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
      int rsServerId)
  {
    final Set<Integer> connectedDSs = new HashSet<Integer>();
    if (rsServerId == rsId)
    {
      /*
      If we are computing connected DSs for the RS we are connected
      to, we should count the local DS as the DSInfo of the local DS is not
      sent by the replication server in the topology message. We must count
      ourselves as a connected server.
      */
      connectedDSs.add(getServerId());
    }
    for (DSInfo dsInfo : dsList)
    {
      if (dsInfo.getRsId() == rsId)
      {
        connectedDSs.add(dsInfo.getDsId());
      }
    }
    return connectedDSs;
  }
  /**
   * Processes an incoming TopologyMsg.
   * Updates the structures for the local view of the topology.
   *
@@ -3016,42 +2986,298 @@
   */
  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
  {
    if (logger.isTraceEnabled())
      debugInfo("receive TopologyMsg=" + topoMsg);
    // Store new DS list
    dsList = topoMsg.getDsList();
    // Update replication server info list with the received topology
    // information
    final Set<Integer> rssToKeep = new HashSet<Integer>();
    for (RSInfo rsInfo : topoMsg.getRsList())
    final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
    for (DSInfo dsInfo : newTopo.replicaInfos.values())
    {
      final int rsId = rsInfo.getId();
      rssToKeep.add(rsId); // Mark this server as still existing
      Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
      ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
      if (rsInfo2 == null)
      {
        // New replication server, create info for it add it to the list
        rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
        setLocallyConfiguredFlag(rsInfo2);
        replicationServerInfos.put(rsId, rsInfo2);
      }
      else
      {
        // Update the existing info for the replication server
        rsInfo2.update(rsInfo, connectedDSs);
      }
      domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
          .getEclIncludesForDeletes());
    }
  }
  private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
  {
    Topology oldTopo;
    Topology newTopo;
    do
    {
      oldTopo = topology.get();
      newTopo = new Topology(topoMsg, getServerId(), rsServerId,
              getReplicationServerUrls(), oldTopo.rsInfos);
    }
    while (!topology.compareAndSet(oldTopo, newTopo));
    if (logger.isTraceEnabled())
    {
      final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
      sb.append(" received TopologyMsg=").append(topoMsg);
      debugInfo(sb);
    }
    return newTopo;
  }
  /**
   * Contains the last known state of the replication topology.
   */
  static final class Topology
  {
    /**
     * The RS's serverId that this DS was connected to when this topology state
     * was computed.
     */
    private final int rsServerId;
    /**
     * Info for other DSs.
     * <p>
     * Warning: does not contain info for us (for our server id)
     */
    final Map<Integer, DSInfo> replicaInfos;
    /**
     * The map of replication server info initialized at connection time and
     * regularly updated. This is used to decide to which best suitable
     * replication server one wants to connect. Key: replication server id
     * Value: replication server info for the matching replication server id
     */
    final Map<Integer, ReplicationServerInfo> rsInfos;
    private Topology()
    {
      this.rsServerId = -1;
      this.replicaInfos = Collections.emptyMap();
      this.rsInfos = Collections.emptyMap();
    }
    // Remove any replication server that may have disappeared from the topology
    replicationServerInfos.keySet().retainAll(rssToKeep);
    for (DSInfo info : dsList)
    /**
     * Constructor to use when only the RSInfos need to be recomputed.
     *
     * @param dsInfosToKeep
     *          the DSInfos that will be stored as is
     * @param newRSInfos
     *          the new RSInfos from which to compute the new topology
     * @param dsServerId
     *          the DS serverId
     * @param rsServerId
     *          the current connected RS serverId
     * @param configuredReplicationServerUrls
     *          the configured replication server URLs
     * @param previousRsInfos
     *          the RSInfos computed in the previous Topology object
     */
    Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
        int dsServerId, int rsServerId,
        Set<String> configuredReplicationServerUrls,
        Map<Integer, ReplicationServerInfo> previousRsInfos)
    {
      domain.setEclIncludes(info.getDsId(), info.getEclIncludes(),
          info.getEclIncludesForDeletes());
      this.rsServerId = rsServerId;
      this.replicaInfos = dsInfosToKeep;
      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
          previousRsInfos, configuredReplicationServerUrls);
    }
    /**
     * Constructor to use when a new TopologyMsg has been received.
     *
     * @param topoMsg
     *          the topology message containing the new DSInfos and RSInfos from
     *          which to compute the new topology
     * @param dsServerId
     *          the DS serverId
     * @param rsServerId
     *          the current connected RS serverId
     * @param configuredReplicationServerUrls
     *          the configured replication server URLs
     * @param previousRsInfos
     *          the RSInfos computed in the previous Topology object
     */
    Topology(TopologyMsg topoMsg, int dsServerId,
        int rsServerId, Set<String> configuredReplicationServerUrls,
        Map<Integer, ReplicationServerInfo> previousRsInfos)
    {
      this.rsServerId = rsServerId;
      this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
      this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
          previousRsInfos, configuredReplicationServerUrls);
    }
    private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
        int dsServerId)
    {
      final Map<Integer, DSInfo> copy = new HashMap<Integer, DSInfo>(dsInfos);
      copy.remove(dsServerId);
      return Collections.unmodifiableMap(copy);
    }
    private Map<Integer, ReplicationServerInfo> computeRSInfos(
        int dsServerId, List<RSInfo> newRsInfos,
        Map<Integer, ReplicationServerInfo> previousRsInfos,
        Set<String> configuredReplicationServerUrls)
    {
      final Map<Integer, ReplicationServerInfo> results =
          new HashMap<Integer, ReplicationServerInfo>(previousRsInfos);
      // Update replication server info list with the received topology info
      final Set<Integer> rssToKeep = new HashSet<Integer>();
      for (RSInfo newRSInfo : newRsInfos)
      {
        final int rsId = newRSInfo.getId();
        rssToKeep.add(rsId); // Mark this server as still existing
        Set<Integer> connectedDSs =
            computeDSsConnectedTo(rsId, dsServerId);
        ReplicationServerInfo rsInfo = results.get(rsId);
        if (rsInfo == null)
        {
          // New replication server, create info for it add it to the list
          rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
          setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
          results.put(rsId, rsInfo);
        }
        else
        {
          // Update the existing info for the replication server
          rsInfo.update(newRSInfo, connectedDSs);
        }
      }
      // Remove any replication server that may have disappeared from the
      // topology
      results.keySet().retainAll(rssToKeep);
      return Collections.unmodifiableMap(results);
    }
    /** Computes the list of DSs connected to a particular RS. */
    private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
    {
      final Set<Integer> connectedDSs = new HashSet<Integer>();
      if (rsServerId == rsId)
      {
        /*
         * If we are computing connected DSs for the RS we are connected to, we
         * should count the local DS as the DSInfo of the local DS is not sent
         * by the replication server in the topology message. We must count
         * ourselves as a connected server.
         */
        connectedDSs.add(dsServerId);
      }
      for (DSInfo dsInfo : replicaInfos.values())
      {
        if (dsInfo.getRsId() == rsId)
        {
          connectedDSs.add(dsInfo.getDsId());
        }
      }
      return connectedDSs;
    }
    /**
     * Sets the locally configured flag for the passed ReplicationServerInfo
     * object, analyzing the local configuration.
     *
     * @param rsInfo
     *          the Replication server to check and update
     * @param configuredReplicationServerUrls
     */
    private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
        Set<String> configuredReplicationServerUrls)
    {
      // Determine if the passed ReplicationServerInfo has a URL that is present
      // in the locally configured replication servers
      String rsUrl = rsInfo.getServerURL();
      if (rsUrl == null)
      {
        // The ReplicationServerInfo has been generated from a server with
        // no URL in TopologyMsg (i.e: with replication protocol version < 4):
        // ignore this server as we do not know how to connect to it
        rsInfo.setLocallyConfigured(false);
        return;
      }
      for (String serverUrl : configuredReplicationServerUrls)
      {
        if (isSameReplicationServerUrl(serverUrl, rsUrl))
        {
          // This RS is locally configured, mark this
          rsInfo.setLocallyConfigured(true);
          rsInfo.setServerURL(serverUrl);
          return;
        }
      }
      rsInfo.setLocallyConfigured(false);
    }
    /** {@inheritDoc} */
    @Override
    public boolean equals(Object obj)
    {
      if (this == obj)
      {
        return true;
      }
      if (obj == null || getClass() != obj.getClass())
      {
        return false;
      }
      final Topology other = (Topology) obj;
      return rsServerId == other.rsServerId
          && equals(replicaInfos, other.replicaInfos)
          && equals(rsInfos, other.rsInfos)
          && urlsEqual1(replicaInfos, other.replicaInfos)
          && urlsEqual2(rsInfos, other.rsInfos);
    }
    private boolean equals(Object o1, Object o2)
    {
      return o1 == null ? o2 == null : o1.equals(o2);
    }
    private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
        Map<Integer, DSInfo> replicaInfos2)
    {
      for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
      {
        DSInfo dsInfo = replicaInfos2.get(entry.getKey());
        if (!equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
        {
          return false;
        }
      }
      return true;
    }
    private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
        Map<Integer, ReplicationServerInfo> rsInfos2)
    {
      for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
      {
        ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
        if (!equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
        {
          return false;
        }
      }
      return true;
    }
    /** {@inheritDoc} */
    @Override
    public int hashCode()
    {
      final int prime = 31;
      int result = 1;
      result = prime * result + rsServerId;
      result = prime * result
          + (replicaInfos == null ? 0 : replicaInfos.hashCode());
      result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
      return result;
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
          + ", rsInfos=" + rsInfos.values();
    }
  }
@@ -3197,15 +3423,15 @@
      .append(" \"").append(getBaseDN()).append(" ")
      .append(getServerId()).append("\",")
      .append(" groupId=").append(getGroupId())
      .append(", genId=").append(generationID)
      .append(", genId=").append(getGenerationID())
      .append(", ");
    connectedRS.get().toString(sb);
    return sb.toString();
  }
  private void debugInfo(String message)
  private void debugInfo(CharSequence message)
  {
    logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
        + " and serverId=" + getServerId() + " " + message);
        + " and serverId=" + getServerId() + ": " + message);
  }
}