mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
06.11.2009 3a9e211d36ee94ff99941943b3b51e0f768624f5
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
import org.opends.server.replication.server.
  ReplicationServer.GlobalServerId;
/**
 * This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
  // late or not
  private StatusAnalyzer statusAnalyzer = null;
  // The monitoring publisher that periodically sends monitoring messages to the
  // topology
  private MonitoringPublisher monitoringPublisher = null;
  /*
   * The following map contains one balanced tree for each replica ID
   * to which we are currently publishing
@@ -1066,6 +1072,17 @@
          // Try doing job anyway...
        }
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        {
          if (debugEnabled())
            TRACER.debugInfo("In " +
              replicationServer.getMonitorInstanceName() +
              " remote server " + handler.getMonitorInstanceName() + " is " +
              "the last RS/DS to be stopped: stopping monitoring publisher");
          stopMonitoringPublisher();
        }
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsValue(handler))
@@ -1082,44 +1099,39 @@
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else
        } else if (directoryServers.containsValue(handler))
        {
          if (directoryServers.containsValue(handler))
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (directoryServers.size() == 1)
          {
            // If this is the last DS for the domain,
            // shutdown the status analyzer
            if (directoryServers.size() == 1)
            {
              if (debugEnabled())
                TRACER.debugInfo("In " +
                    replicationServer.getMonitorInstanceName() +
                    " remote server " + handler.getMonitorInstanceName() +
            if (debugEnabled())
              TRACER.debugInfo("In " +
                replicationServer.getMonitorInstanceName() +
                " remote server " + handler.getMonitorInstanceName() +
                " is the last DS to be stopped: stopping status analyzer");
              stopStatusAnalyzer();
            }
            stopStatusAnalyzer();
          }
            unregisterServerHandler(handler);
            handler.shutdown();
          unregisterServerHandler(handler);
          handler.shutdown();
            // Check if generation id has to be reset
            mayResetGenerationId();
          // Check if generation id has to be reset
          mayResetGenerationId();
          if (!shutdown)
          {
            // Update the remote replication servers with our list
            // of connected LDAP servers
            if (!shutdown)
            {
              buildAndSendTopoInfoToRSs();
              // Warn our DSs that a RS or DS has quit (does not use this
              // handler as already removed from list)
              buildAndSendTopoInfoToDSs(null);
            }
            buildAndSendTopoInfoToRSs();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
          }
          else if (otherHandlers.contains(handler))
          {
            unRegisterHandler(handler);
            handler.shutdown();
          }
        } else if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
          handler.shutdown();
        }
      }
      catch(Exception e)
      {
@@ -1581,99 +1593,51 @@
        // in the topology.
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          // Monitoring information requested by a DS
          MonitorMsg monitorMsg =
            createGlobalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
          try
           if (monitorMsg != null)
          {
            returnMsg.setReplServerDbState(getDbServerState());
            // Update the information we have about all servers
            // in the topology.
            MonitorData md = computeMonitorData();
            // Add the informations about the Replicas currently in
            // the topology.
            Iterator<Integer> it = md.ldapIterator();
            while (it.hasNext())
            try
            {
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getLDAPServerState(replicaId),
                  md.getApproxFirstMissingDate(replicaId), true);
            }
            // Add the informations about the Replication Servers
            // currently in the topology.
            it = md.rsIterator();
            while (it.hasNext())
              senderHandler.send(monitorMsg);
            } catch (IOException e)
            {
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getRSStates(replicaId),
                  md.getRSApproxFirstMissingDate(replicaId), false);
              // the connection was closed.
            }
          }
          catch (DirectoryException e)
          {
            // If we can't compute the Monitor Information, send
            // back an empty message.
          }
          try
          {
            senderHandler.send(returnMsg);
          } catch (IOException e)
          {
            // the connection was closed.
          }
          return;
        }
        MonitorMsg monitorMsg =
          new MonitorMsg(msg.getDestination(), msg.getsenderID());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (DataServerHandler lsh : this.directoryServers.values())
        } else
        {
          monitorMsg.setServerState(
            lsh.getServerId(),
            lsh.getServerState(),
            lsh.getApproxFirstMissingDate(),
            true);
        }
          // Monitoring information requested by a RS
          MonitorMsg monitorMsg =
            createLocalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
        // Same for the connected RS
        for (ReplicationServerHandler rsh : this.replicationServers.values())
        {
          monitorMsg.setServerState(
            rsh.getServerId(),
            rsh.getServerState(),
            rsh.getApproxFirstMissingDate(),
            false);
        }
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerDbState(this.getDbServerState());
        try
        {
          senderHandler.send(monitorMsg);
        } catch (Exception e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
              Integer.toString((msg.getDestination()))));
          if (monitorMsg != null)
          {
            try
            {
              senderHandler.send(monitorMsg);
            } catch (Exception e)
            {
              // We log the error. The requestor will detect a timeout or
              // any other failure on the connection.
              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                  Integer.toString((msg.getDestination()))));
            }
          }
        }
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg =
          (MonitorMsg) msg;
        receivesMonitorDataResponse(monitorMsg);
        GlobalServerId globalServerId =
          new GlobalServerId(baseDn, senderHandler.getServerId());
        receivesMonitorDataResponse(monitorMsg, globalServerId);
      } else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * whole topology.
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   */
  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg returnMsg =
      new MonitorMsg(sender, destination);
    try
    {
      returnMsg.setReplServerDbState(getDbServerState());
      // Update the information we have about all servers
      // in the topology.
      MonitorData md = computeMonitorData();
      // Add the informations about the Replicas currently in
      // the topology.
      Iterator<Integer> it = md.ldapIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getLDAPServerState(replicaId),
            md.getApproxFirstMissingDate(replicaId), true);
      }
      // Add the informations about the Replication Servers
      // currently in the topology.
      it = md.rsIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getRSStates(replicaId),
            md.getRSApproxFirstMissingDate(replicaId), false);
      }
    }
    catch (DirectoryException e)
    {
      // If we can't compute the Monitor Information, send
      // back an empty message.
    }
    return returnMsg;
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * topology directly connected to this RS. This includes information for:
   * - local RS
   * - all direct DSs
   * - all direct RSs
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg monitorMsg = null;
    try {
      // Lock domain as we need to go through connected servers list
      lock();
      monitorMsg = new MonitorMsg(sender, destination);
      // Populate for each connected LDAP Server
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      {
        monitorMsg.setServerState(
          lsh.getServerId(),
          lsh.getServerState(),
          lsh.getApproxFirstMissingDate(),
          true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      {
        monitorMsg.setServerState(
          rsh.getServerId(),
          rsh.getServerState(),
          rsh.getApproxFirstMissingDate(),
          false);
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(this.getDbServerState());
    } catch(InterruptedException e)
    {
      // At lock, too bad...
    } finally
    {
      if (hasLock())
        release();
    }
    return monitorMsg;
  }
  /**
   * Shutdown this ReplicationServerDomain.
   */
  public void shutdown()
@@ -1831,8 +1905,7 @@
  /**
   * Send a TopologyMsg to all the connected directory servers in order to
   * let.
   * them know the topology (every known DSs and RSs)
   * let them know the topology (every known DSs and RSs).
   * @param notThisOne If not null, the topology message will not be sent to
   * this passed server.
   */
@@ -1931,10 +2004,11 @@
      dsInfos.add(serverHandler.toDSInfo());
    }
    // Create info for us (local RS)
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId());
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
    // Add our own info (local RS)
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId());
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    // Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @return The number of response that should come back.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  int initializeMonitorData() throws DirectoryException
  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
@@ -2539,7 +2616,7 @@
    }
    // Send the request for remote monitor data to the
    return sendMonitorDataRequest();
    sendMonitorDataRequest(expectedMonitoringMsg);
  }
  /**
@@ -2566,22 +2643,25 @@
  /**
   * Sends a MonitorRequest message to all connected RS.
   * @return the number of requests sent.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   * @throws DirectoryException when a problem occurs.
   */
  protected int sendMonitorDataRequest()
  protected void sendMonitorDataRequest(
    List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    int sent = 0;
    try
    {
      for (ServerHandler rs : replicationServers.values())
      {
        int serverId = rs.getServerId();
        MonitorRequestMsg msg =
          new MonitorRequestMsg(this.replicationServer.getServerId(),
          rs.getServerId());
          serverId);
        rs.send(msg);
        sent++;
        // Store the fact that we expect a MonitoringMsg back from this server
        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
      }
    } catch (Exception e)
    {
@@ -2590,7 +2670,6 @@
      throw new DirectoryException(ResultCode.OTHER,
        message, e);
    }
    return sent;
  }
  /**
@@ -2598,8 +2677,10 @@
   * and stores the data received.
   *
   * @param msg The message to be processed.
   * @param globalServerHandlerId server handler that is receiving the message.
   */
  public void receivesMonitorDataResponse(MonitorMsg msg)
  private void receivesMonitorDataResponse(MonitorMsg msg,
    GlobalServerId globalServerId)
  {
    try
    {
@@ -2677,7 +2758,7 @@
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      replicationServer.responseReceived();
      replicationServer.responseReceived(globalServerId);
    } catch (Exception e)
    {
@@ -2832,6 +2913,57 @@
  }
  /**
   * Starts the monitoring publisher for the domain.
   */
  public void startMonitoringPublisher()
  {
    if (monitoringPublisher == null)
    {
      long period =
        replicationServer.getMonitoringPublisherPeriod();
      if (period > 0) // 0 means no monitoring publisher
      {
        monitoringPublisher = new MonitoringPublisher(this, period);
        monitoringPublisher.start();
      }
    }
  }
  /**
   * Stops the monitoring publisher for the domain.
   */
  public void stopMonitoringPublisher()
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.shutdown();
      monitoringPublisher.waitForShutdown();
      monitoringPublisher = null;
    }
  }
  /**
   * Tests if the monitoring publisher for this domain is running.
   * @return True if the monitoring publisher is running, false otherwise.
   */
  public boolean isRunningMonitoringPublisher()
  {
    return (monitoringPublisher != null);
  }
  /**
   * Update the monitoring publisher with the new period value.
   * @param period The new period value.
   */
  public void updateMonitoringPublisher(long period)
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.setPeriod(period);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override