mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
16.01.2012 0aac3576864aa130aac13292604804806bd13255
Code and Typo cleanup. Some reformatting included.
1 files modified
251 ■■■■■ changed files
opends/src/ads/org/opends/admin/ads/TopologyCache.java 251 ●●●●● patch | view | raw | blame | history
opends/src/ads/org/opends/admin/ads/TopologyCache.java
@@ -23,15 +23,14 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 *      Portions copyright 2011-2012 ForgeRock AS
 */
package org.opends.admin.ads;
import static org.opends.messages.QuickSetupMessages.
 INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER;
import static org.opends.messages.QuickSetupMessages.
 INFO_NOT_GLOBAL_ADMINISTRATOR_PROVIDED;
import static org.opends.messages.QuickSetupMessages
    .INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER;
import static org.opends.messages.QuickSetupMessages
    .INFO_NOT_GLOBAL_ADMINISTRATOR_PROVIDED;
import java.util.Collection;
import java.util.Date;
@@ -61,33 +60,34 @@
import org.opends.quicksetup.util.Utils;
/**
 * This class allows to read the configuration of the different servers that
 * are registered in a given ADS server.  It provides a read only view of the
 * configuration of the servers and of the replication topologies that might
 * be configured between them.
 * This class allows to read the configuration of the different servers that are
 * registered in a given ADS server. It provides a read only view of the
 * configuration of the servers and of the replication topologies that might be
 * configured between them.
 */
public class TopologyCache
{
  private ADSContext adsContext;
  private ApplicationTrustManager trustManager;
  private int timeout;
  private String dn;
  private String pwd;
  private Set<ServerDescriptor> servers = new HashSet<ServerDescriptor>();
  private Set<SuffixDescriptor> suffixes = new HashSet<SuffixDescriptor>();
  private LinkedHashSet<PreferredConnection> preferredConnections =
    new LinkedHashSet<PreferredConnection>();
  private TopologyCacheFilter filter = new TopologyCacheFilter();
  private final ADSContext adsContext;
  private final ApplicationTrustManager trustManager;
  private final int timeout;
  private final String bindDN;
  private final String bindPwd;
  private final Set<ServerDescriptor> servers =
      new HashSet<ServerDescriptor>();
  private final Set<SuffixDescriptor> suffixes =
      new HashSet<SuffixDescriptor>();
  private final LinkedHashSet<PreferredConnection> preferredConnections =
      new LinkedHashSet<PreferredConnection>();
  private final TopologyCacheFilter filter = new TopologyCacheFilter();
  private final boolean isMultiThreaded = true;
  private final static int MULTITHREAD_TIMEOUT = 90 * 1000;
  private static final Logger LOG =
    Logger.getLogger(TopologyCache.class.getName());
      Logger.getLogger(TopologyCache.class.getName());
  /**
   * Constructor of the TopologyCache.
   *
   * @param adsContext the adsContext to the ADS registry.
   * @param trustManager the ApplicationTrustManager that must be used to trust
   * certificates when we create connections to the registered servers to read
@@ -96,18 +96,19 @@
   * Use {@code 0} to express no timeout.
   */
  public TopologyCache(ADSContext adsContext,
      ApplicationTrustManager trustManager,
      int timeout)
                       ApplicationTrustManager trustManager,
                       int timeout)
  {
    this.adsContext = adsContext;
    this.trustManager = trustManager;
    this.timeout = timeout;
    dn = ConnectionUtils.getBindDN(adsContext.getDirContext());
    pwd = ConnectionUtils.getBindPassword(adsContext.getDirContext());
    bindDN = ConnectionUtils.getBindDN(adsContext.getDirContext());
    bindPwd = ConnectionUtils.getBindPassword(adsContext.getDirContext());
  }
  /**
   * Reads the configuration of the registered servers.
   *
   * @throws TopologyCacheException if there is an issue reading the
   * configuration of the registered servers.
   */
@@ -117,38 +118,40 @@
    servers.clear();
    try
    {
      Set<Map<ServerProperty,Object>> adsServers =
        adsContext.readServerRegistry();
      Set<Map<ServerProperty, Object>> adsServers =
          adsContext.readServerRegistry();
      Set<ServerLoader> threadSet = new HashSet<ServerLoader>();
      for (Map<ServerProperty,Object> serverProperties : adsServers)
      for (Map<ServerProperty, Object> serverProperties : adsServers)
      {
        ServerLoader t = getServerLoader(serverProperties);
        if (isMultiThreaded)
        {
            t.start();
            threadSet.add(t);
          t.start();
          threadSet.add(t);
        }
        else
        {
            t.run();
          t.run();
        }
      }
      if (isMultiThreaded)
      {
        joinThreadSet(threadSet);
      }
      /* Try to consolidate things (even if the data is not complete). */
      /*
       * Try to consolidate things (even if the data is not complete).
       */
      HashMap<LdapName, Set<SuffixDescriptor>> hmSuffixes =
        new HashMap<LdapName, Set<SuffixDescriptor>>();
          new HashMap<LdapName, Set<SuffixDescriptor>>();
      for (ServerLoader loader : threadSet)
      {
        ServerDescriptor descriptor = loader.getServerDescriptor();
        for (ReplicaDescriptor replica : descriptor.getReplicas())
        {
          LOG.log(Level.INFO, "Handling replica with dn: "+
              replica.getSuffix().getDN());
          LOG.log(Level.INFO, "Handling replica with dn: "
              + replica.getSuffix().getDN());
          boolean suffixFound = false;
          LdapName dn = new LdapName(replica.getSuffix().getDN());
@@ -205,6 +208,7 @@
  /**
   * Returns the trust manager used by this class.
   *
   * @return the trust manager used by this class.
   */
  public ApplicationTrustManager getTrustManager()
@@ -214,6 +218,7 @@
  /**
   * Returns the timeout to establish the connection in milliseconds.
   *
   * @return the timeout to establish the connection in milliseconds. Returns
   * {@code 0} to express no timeout.
   */
@@ -224,6 +229,7 @@
  /**
   * Reads the replication monitoring.
   *
   * @throws NamingException if an error occurs reading the replication
   * monitoring.
   */
@@ -248,7 +254,7 @@
        // configuration, so assume that we might be able to read monitoring
        // (even if an exception occurred before).
        Set<ReplicaDescriptor> candidateReplicas =
          new HashSet<ReplicaDescriptor>();
            new HashSet<ReplicaDescriptor>();
        // It contains replication information: analyze it.
        String repServer = server.getReplicationServerHostPort();
        for (SuffixDescriptor suffix : getSuffixes())
@@ -266,7 +272,7 @@
        if (!candidateReplicas.isEmpty())
        {
          Set<ReplicaDescriptor> updatedReplicas =
            new HashSet<ReplicaDescriptor>();
              new HashSet<ReplicaDescriptor>();
          try
          {
            updateReplicas(server, candidateReplicas, updatedReplicas);
@@ -289,8 +295,9 @@
  /**
   * Sets the list of LDAP URLs and connection type that are preferred to be
   * used to connect to the servers.  When we have a server to which we can
   * used to connect to the servers. When we have a server to which we can
   * connect using a URL on the list we will try to use it.
   *
   * @param cnx the list of preferred connections.
   */
  public void setPreferredConnections(LinkedHashSet<PreferredConnection> cnx)
@@ -301,9 +308,10 @@
  /**
   * Returns the list of LDAP URLs and connection type that are preferred to be
   * used to connect to the servers.  If a URL is on this list, when we have a
   * used to connect to the servers. If a URL is on this list, when we have a
   * server to which we can connect using that URL and the associated connection
   * type we will try to use it.
   *
   * @return the list of preferred connections.
   */
  public LinkedHashSet<PreferredConnection> getPreferredConnections()
@@ -313,6 +321,7 @@
  /**
   * Returns a Set containing all the servers that are registered in the ADS.
   *
   * @return a Set containing all the servers that are registered in the ADS.
   */
  public Set<ServerDescriptor> getServers()
@@ -325,6 +334,7 @@
  /**
   * Returns a Set containing the suffixes (replication topologies) that could
   * be retrieved after the last call to reloadTopology.
   *
   * @return a Set containing the suffixes (replication topologies) that could
   * be retrieved after the last call to reloadTopology.
   */
@@ -337,6 +347,7 @@
  /**
   * Returns the filter to be used when retrieving information.
   *
   * @return the filter to be used when retrieving information.
   */
  public TopologyCacheFilter getFilter()
@@ -347,16 +358,17 @@
  /**
   * Method used to wait at most a certain time (MULTITHREAD_TIMEOUT) for the
   * different threads to finish.
   * @param threadSet the list of threads (we assume that they are started)
   * that we must wait for.
   *
   * @param threadSet the list of threads (we assume that they are started) that
   * we must wait for.
   */
  private void joinThreadSet(Set<ServerLoader> threadSet)
  {
    Date startDate = new Date();
    for (ServerLoader t : threadSet)
    {
      long timeToJoin = MULTITHREAD_TIMEOUT - System.currentTimeMillis() +
      startDate.getTime();
      long timeToJoin = MULTITHREAD_TIMEOUT - System.currentTimeMillis()
          + startDate.getTime();
      try
      {
        if (timeToJoin > 0)
@@ -375,26 +387,28 @@
    }
    Date endDate = new Date();
    long workingTime = endDate.getTime() - startDate.getTime();
    LOG.log(Level.INFO, "Loading ended at "+ workingTime + " ms");
    LOG.log(Level.INFO, "Loading ended at " + workingTime + " ms");
  }
  /**
   * Creates a ServerLoader object based on the provided server properties.
   * @param serverProperties the server properties to be used to generate
   * the ServerLoader.
   *
   * @param serverProperties the server properties to be used to generate the
   * ServerLoader.
   * @return a ServerLoader object based on the provided server properties.
   */
  private ServerLoader getServerLoader(
      Map<ServerProperty,Object> serverProperties)
      Map<ServerProperty, Object> serverProperties)
  {
    return new ServerLoader(serverProperties, dn, pwd,
    return new ServerLoader(serverProperties, bindDN, bindPwd,
        trustManager == null ? null : trustManager.createCopy(),
            timeout,
            getPreferredConnections(), getFilter());
        timeout,
        getPreferredConnections(), getFilter());
  }
  /**
   * Returns the adsContext used by this TopologyCache.
   *
   * @return the adsContext used by this TopologyCache.
   */
  public ADSContext getAdsContext()
@@ -402,19 +416,18 @@
    return adsContext;
  }
  /**
   * Returns a set of error messages encountered in the TopologyCache.
   *
   * @return a set of error messages encountered in the TopologyCache.
   */
  public LinkedHashSet<Message> getErrorMessages()
  {
    Set<TopologyCacheException> exceptions =
      new HashSet<TopologyCacheException>();
    Set<ServerDescriptor> servers = getServers();
        new HashSet<TopologyCacheException>();
    Set<ServerDescriptor> theServers = getServers();
    LinkedHashSet<Message> exceptionMsgs = new LinkedHashSet<Message>();
    for (ServerDescriptor server : servers)
    for (ServerDescriptor server : theServers)
    {
      TopologyCacheException e = server.getLastException();
      if (e != null)
@@ -422,7 +435,9 @@
        exceptions.add(e);
      }
    }
    /* Check the exceptions and see if we throw them or not. */
    /*
     * Check the exceptions and see if we throw them or not.
     */
    for (TopologyCacheException e : exceptions)
    {
      switch (e.getType())
@@ -430,61 +445,62 @@
        case NOT_GLOBAL_ADMINISTRATOR:
          exceptionMsgs.add(INFO_NOT_GLOBAL_ADMINISTRATOR_PROVIDED.get());
        break;
      case GENERIC_CREATING_CONNECTION:
        if ((e.getCause() != null) &&
            Utils.isCertificateException(e.getCause()))
        {
          exceptionMsgs.add(
              INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER.get(
              e.getHostPort(), e.getCause().getMessage()));
        }
        else
        {
          break;
        case GENERIC_CREATING_CONNECTION:
          if ((e.getCause() != null)
              && Utils.isCertificateException(e.getCause()))
          {
            exceptionMsgs.add(
                INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER.get(
                e.getHostPort(), e.getCause().getMessage()));
          }
          else
          {
            exceptionMsgs.add(Utils.getMessage(e));
          }
          break;
        default:
          exceptionMsgs.add(Utils.getMessage(e));
        }
        break;
      default:
        exceptionMsgs.add(Utils.getMessage(e));
      }
    }
    return exceptionMsgs;
  }
  /**
   * Updates the monitoring information of the provided replicas using the
   * information located in cn=monitor of a given replication server.
   *
   * @param replicationServer the replication server.
   * @param candidateReplicas the collection of replicas that must be updated.
   * @param updatedReplicas the collection of replicas that are actually
   * updated.  This list is updated by the method.
   * updated. This list is updated by the method.
   */
  private void updateReplicas(ServerDescriptor replicationServer,
      Collection<ReplicaDescriptor> candidateReplicas,
      Collection<ReplicaDescriptor> updatedReplicas) throws NamingException
                              Collection<ReplicaDescriptor> candidateReplicas,
                              Collection<ReplicaDescriptor> updatedReplicas)
      throws NamingException
  {
    SearchControls ctls = new SearchControls();
    ctls.setSearchScope(SearchControls.SUBTREE_SCOPE);
    ctls.setReturningAttributes(
        new String[] {
            "approx-older-change-not-synchronized-millis", "missing-changes",
            "domain-name", "server-id"
        new String[]
        {
          "approx-older-change-not-synchronized-millis", "missing-changes",
          "domain-name", "server-id"
        });
    String filter = "(missing-changes=*)";
    LdapName jndiName = new LdapName("cn=monitor");
    InitialLdapContext ctx = null;
    NamingEnumeration<SearchResult> monitorEntries = null;
    try
    {
      ServerLoader loader =
        getServerLoader(replicationServer.getAdsProperties());
          getServerLoader(replicationServer.getAdsProperties());
      ctx = loader.createContext();
      monitorEntries = ctx.search(jndiName, filter, ctls);
      while(monitorEntries.hasMore())
      monitorEntries = ctx.search(
          new LdapName("cn=monitor"), "(missing-changes=*)", ctls);
      while (monitorEntries.hasMore())
      {
        SearchResult sr = monitorEntries.next();
@@ -493,44 +509,47 @@
        try
        {
          replicaId =
            new Integer(ConnectionUtils.getFirstValue(sr, "server-id"));
              new Integer(ConnectionUtils.getFirstValue(sr, "server-id"));
        }
        catch (Throwable t)
        {
          LOG.log(Level.WARNING, "Unexpected error reading replica ID: "+t, t);
          LOG.log(Level.WARNING, "Unexpected error reading replica ID: " + t,
              t);
        }
        for (ReplicaDescriptor replica: candidateReplicas)
        for (ReplicaDescriptor replica : candidateReplicas)
        {
          if (Utils.areDnsEqual(dn, replica.getSuffix().getDN()) &&
              replica.isReplicated() &&
              (replica.getReplicationId() == replicaId))
          if (Utils.areDnsEqual(dn, replica.getSuffix().getDN())
              && replica.isReplicated()
              && (replica.getReplicationId() == replicaId))
          {
            try
            // This statistic is optional.
            String s = ConnectionUtils.getFirstValue(sr,
                "approx-older-change-not-synchronized-millis");
            if (s != null)
            {
              // This statistic is optional.
              String s = ConnectionUtils.getFirstValue(sr,
                  "approx-older-change-not-synchronized-millis");
              if (s != null)
              try
              {
                replica.setAgeOfOldestMissingChange(Long.valueOf(s));
              }
              catch (Throwable t)
              {
                LOG.log(Level.WARNING,
                    "Unexpected error reading age of oldest change: " + t, t);
              }
            }
            catch (Throwable t)
            s = ConnectionUtils.getFirstValue(sr, "missing-changes");
            if (s != null)
            {
              LOG.log(Level.WARNING,
                  "Unexpected error reading age of oldest change: "+t, t);
            }
            try
            {
              replica.setMissingChanges(
                  new Integer(ConnectionUtils.getFirstValue(sr,
                      "missing-changes")));
            }
            catch (Throwable t)
            {
              LOG.log(Level.WARNING,
                  "Unexpected error reading missing changes: "+t, t);
              try
              {
                replica.setMissingChanges(new Integer(s));
              }
              catch (Throwable t)
              {
                LOG.log(Level.WARNING,
                    "Unexpected error reading missing changes: " + t, t);
              }
            }
            updatedReplicas.add(replica);
          }
@@ -544,7 +563,15 @@
    {
      if (monitorEntries != null)
      {
        monitorEntries.close();
        try
        {
          monitorEntries.close();
        }
        catch (Throwable t)
        {
          LOG.log(Level.WARNING,
              "Unexpected error closing enumeration on monitor entries" + t, t);
        }
      }
      if (ctx != null)
      {