mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
03.41.2008 2942eaa1b7264228c9ca7535aabd206e663581e9
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.MonitorMessage;
import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
/**
@@ -118,6 +128,34 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
  private long remoteMonitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /* The date of the last time they have been elaborated */
  private long validityDate = 0;
  // For each LDAP server, its server state
  private HashMap<Short, ServerState> LDAPStates =
    new HashMap<Short, ServerState>();
  // For each LDAP server, the last CN it published
  private HashMap<Short, ChangeNumber> maxCNs =
    new HashMap<Short, ChangeNumber>();
  // For each LDAP server, an approximation of the date of the first missing
  // change
  private HashMap<Short, Long> approxFirstMissingDate =
    new HashMap<Short, Long>();
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
@@ -352,7 +390,7 @@
        }
        else
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          if (rsh.hasRemoteLDAPServers())
          {
            lDAPServersConnectedInTheTopology = true;
@@ -636,7 +674,7 @@
        // server connected
        for (ServerHandler rsh : replicationServers.values())
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          if (rsh.hasRemoteLDAPServers())
          {
            servers.add(rsh);
          }
@@ -693,15 +731,58 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // A replication server is not expected to be the destination
    // of a routable message except for an error message.
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (msg.getDestination() == this.replicationServer.getServerId())
    {
      if (msg instanceof ErrorMessage)
      {
        ErrorMessage errorMsg = (ErrorMessage)msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(
                   errorMsg.getDetails()));
            errorMsg.getDetails()));
      }
      else if (msg instanceof MonitorRequestMessage)
      {
        MonitorRequestMessage replServerMonitorRequestMsg =
          (MonitorRequestMessage) msg;
        MonitorMessage monitorMsg =
          new MonitorMessage(
              replServerMonitorRequestMsg.getDestination(),
              replServerMonitorRequestMsg.getsenderID());
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerState(this.getDbServerState());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (ServerHandler lsh : this.connectedServers.values())
        {
          monitorMsg.setLDAPServerState(
              lsh.getServerId(),
              lsh.getServerState(),
              lsh.getApproxFirstMissingDate());
        }
        try
        {
          senderHandler.send(monitorMsg);
        }
        catch(Exception e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
              Short.toString((msg.getDestination()))));
        }
      }
      else if (msg instanceof MonitorMessage)
      {
        MonitorMessage monitorMsg =
          (MonitorMessage) msg;
        receivesMonitorDataResponse(monitorMsg);
      }
      else
      {
@@ -1156,4 +1237,288 @@
    {
      return replicationServer;
    }
    /*
     * Monitor Data generation
     */
    /**
     * Retrieves the remote monitor data.
     *
     * @throws DirectoryException When an error occurs.
     */
    protected void retrievesRemoteMonitorData()
      throws DirectoryException
    {
      if (validityDate > TimeThread.getTime())
      {
        // The current data are still valid. No need to renew them.
        return;
      }
      // Clean
      this.LDAPStates.clear();
      this.maxCNs.clear();
      // Init the maxCNs of our direct LDAP servers from our own dbstate
      for (ServerHandler rs : connectedServers.values())
      {
        short serverID = rs.getServerId();
        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
        if (cn == null)
        {
          // we have nothing in db for that server
          cn = new ChangeNumber(0, 0 , serverID);
        }
        this.maxCNs.put(serverID, cn);
      }
      ServerState replServerState = this.getDbServerState();
      Iterator<Short> it = replServerState.iterator();
      while (it.hasNext())
      {
        short sid = it.next();
        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
        ChangeNumber maxCN = this.maxCNs.get(sid);
        if ((maxCN != null) && (receivedCN.newer(maxCN)))
        {
          // We found a newer one
          this.maxCNs.remove(sid);
          this.maxCNs.put(sid, receivedCN);
        }
      }
      // Send Request to the other Replication Servers
      if (remoteMonitorResponsesSemaphore == null)
      {
        remoteMonitorResponsesSemaphore = new Semaphore(
            replicationServers.size() -1);
        sendMonitorDataRequest();
        // Wait reponses from them or timeout
        waitMonitorDataResponses(replicationServers.size());
      }
      else
      {
        // The processing of renewing the monitor cache is already running
        // We'll make it sleeping until the end
        while (remoteMonitorResponsesSemaphore!=null)
        {
          waitMonitorDataResponses(1);
        }
      }
      // Now we have the expected answers of an error occured
      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
      if (debugEnabled())
      {
        debugMonitorData();
      }
    }
    private void debugMonitorData()
    {
      String mds = " Monitor data=";
      Iterator<Short> ite = LDAPStates.keySet().iterator();
      while (ite.hasNext())
      {
        Short sid = ite.next();
        ServerState ss = LDAPStates.get(sid);
        mds += " LDAPState(" + sid + ")=" + ss.toString();
      }
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber cn = maxCNs.get(sid);
        mds += " maxCNs(" + sid + ")=" + cn.toString();
      }
      mds += "--";
      TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          mds);
    }
    /**
     * Sends a MonitorRequest message to all connected RS.
     * @throws DirectoryException when a problem occurs.
     */
    protected void sendMonitorDataRequest()
      throws DirectoryException
    {
      try
      {
        for (ServerHandler rs : replicationServers.values())
        {
          MonitorRequestMessage msg = new
            MonitorRequestMessage(this.replicationServer.getServerId(),
              rs.getServerId());
          rs.send(msg);
        }
      }
      catch(Exception e)
      {
        Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
        logError(message);
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
    }
    /**
     * Wait for the expected count of received MonitorMessage.
     * @param expectedResponses The number of expected answers.
     * @throws DirectoryException When an error occurs.
     */
    protected void waitMonitorDataResponses(int expectedResponses)
      throws DirectoryException
    {
      try
      {
        boolean allPermitsAcquired =
          remoteMonitorResponsesSemaphore.tryAcquire(
              expectedResponses,
              (long) 500, TimeUnit.MILLISECONDS);
        if (!allPermitsAcquired)
        {
          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
        }
        else
        {
          if (debugEnabled())
            TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Successfully received all " + replicationServers.size()
            + " expected monitor messages");
        }
      }
      catch(Exception e)
      {
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
      }
      finally
      {
        remoteMonitorResponsesSemaphore = null;
      }
    }
    /**
     * Processes a Monitor message receives from a remote Replication Server
     * and stores the data received.
     *
     * @param msg The message to be processed.
     */
    public void receivesMonitorDataResponse(MonitorMessage msg)
    {
      if (remoteMonitorResponsesSemaphore == null)
      {
        // Ignoring the remote monitor data because an error occured previously
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN accross the RSes
        ServerState replServerState = msg.getReplServerState();
        Iterator<Short> it = replServerState.iterator();
        while (it.hasNext())
        {
          short sid = it.next();
          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
          ChangeNumber maxCN = this.maxCNs.get(sid);
          if (receivedCN.newer(maxCN))
          {
            // We found a newer one
            this.maxCNs.remove(sid);
            this.maxCNs.put(sid, receivedCN);
          }
        }
        // Store the LDAP servers states
        Iterator<Short> sidIterator = msg.iterator();
        while (sidIterator.hasNext())
        {
          short sid = sidIterator.next();
          ServerState ss = msg.getLDAPServerState(sid);
          this.LDAPStates.put(sid, ss);
          this.approxFirstMissingDate.put(sid,
              msg.getApproxFirstMissingDate(sid));
        }
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        remoteMonitorResponsesSemaphore.release();
      }
      catch (Exception e)
      {
        // If an exception occurs while processing one of the expected message,
        // the processing is aborted and the waiting thread is awoke.
        remoteMonitorResponsesSemaphore.notifyAll();
      }
    }
    /**
     * Get the state of the LDAP server with the provided serverId.
     * @param serverId The server ID.
     * @return The server state.
     */
    public ServerState getServerState(short serverId)
    {
      return LDAPStates.get(serverId);
    }
    /**
     * Get the highest know change number of the LDAP server with the provided
     * serverId.
     * @param serverId The server ID.
     * @return The highest change number.
     */
    public ChangeNumber getMaxCN(short serverId)
    {
      return maxCNs.get(serverId);
    }
    /**
     * Get an approximation of the date of the oldest missing changes.
     * serverId.
     * @param serverId The server ID.
     * @return The approximation of the date of the oldest missing change.
     */
    public Long getApproxFirstMissingDate(short serverId)
    {
      return approxFirstMissingDate.get(serverId);
    }
    /**
     * Get the number of missing change for the server with the provided state.
     * @param state The provided server state.
     * @return The number of missing changes.
     */
    public int getMissingChanges(ServerState state)
    {
      // Traverse the max Cn transmitted by each server
      // For each server, get the highest CN know from the current server
      // Sum the difference betwenn the max and the last
      int missingChanges = 0;
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber maxCN = maxCNs.get(sid);
        ChangeNumber last = state.getMaxChangeNumber(sid);
        if (last == null)
        {
          last = new ChangeNumber(0,0, sid);
        }
        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
        missingChanges += missingChangesFromSID;
      }
      return missingChanges;
    }
}