mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
01.21.2008 05d24dcca61eed7921987a98bb94d94a4aa030cd
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -129,8 +128,8 @@
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
  private long remoteMonitorDataLifeTime = 500;
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
@@ -139,21 +138,11 @@
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /* The date of the last time they have been elaborated */
  private long validityDate = 0;
  // For each LDAP server, its server state
  private HashMap<Short, ServerState> LDAPStates =
    new HashMap<Short, ServerState>();
  // For each LDAP server, the last CN it published
  private HashMap<Short, ChangeNumber> maxCNs =
    new HashMap<Short, ChangeNumber>();
  // For each LDAP server, an approximation of the date of the first missing
  // change
  private HashMap<Short, Long> approxFirstMissingDate =
    new HashMap<Short, Long>();
  /**
   * The monitor data consolidated over the topology.
   */
  private  MonitorData monitorData = new MonitorData();
  private  MonitorData wrkMonitorData;
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -166,13 +155,7 @@
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " Created Cache for " + baseDn + " " +
        stackTraceToSingleLineString(new Exception()));
}
  }
  /**
   * Add an update that has been received to the list of
@@ -366,6 +349,10 @@
        {
          replicationServers.remove(handler.getServerId());
          handler.stopHandler();
          // Update the remote replication servers with our list
          // of connected LDAP servers
          sendReplServerInfo();
        }
      }
      else
@@ -374,12 +361,12 @@
        {
          connectedServers.remove(handler.getServerId());
          handler.stopHandler();
          // Update the remote replication servers with our list
          // of connected LDAP servers
          sendReplServerInfo();
        }
      }
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
  }
  /**
@@ -578,7 +565,8 @@
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @return the created ReplicationIterator.
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getChangelogIterator(short serverId,
                    ChangeNumber changeNumber)
@@ -591,7 +579,8 @@
    {
      return handler.generateIterator(changeNumber);
    }
    catch (Exception e) {
    catch (Exception e)
    {
     return null;
    }
  }
@@ -759,6 +748,7 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (msg.getDestination() == this.replicationServer.getServerId())
@@ -779,20 +769,33 @@
              replServerMonitorRequestMsg.getDestination(),
              replServerMonitorRequestMsg.getsenderID());
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerState(this.getDbServerState());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (ServerHandler lsh : this.connectedServers.values())
        {
          monitorMsg.setLDAPServerState(
          monitorMsg.setServerState(
              lsh.getServerId(),
              lsh.getServerState(),
              lsh.getApproxFirstMissingDate());
              lsh.getApproxFirstMissingDate(),
              true);
        }
        // Same for the connected RS
        for (ServerHandler rsh : this.replicationServers.values())
        {
          monitorMsg.setServerState(
              rsh.getServerId(),
              rsh.getServerState(),
              rsh.getApproxFirstMissingDate(),
              false);
        }
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerDbState(this.getDbServerState());
        try
        {
          senderHandler.send(monitorMsg);
@@ -1305,118 +1308,135 @@
      }
    }
    /*
    /* =======================
     * Monitor Data generation
     * =======================
     */
    /**
     * Retrieves the remote monitor data.
     *
     * Retrieves the global monitor data.
     * @return The monitor data.
     * @throws DirectoryException When an error occurs.
     */
    protected void retrievesRemoteMonitorData()
    synchronized protected MonitorData getMonitorData()
      throws DirectoryException
    {
      if (validityDate > TimeThread.getTime())
      if (monitorData.getBuildDate() + monitorDataLifeTime
          > TimeThread.getTime())
      {
        // The current data are still valid. No need to renew them.
        return;
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
       // The current data are still valid. No need to renew them.
        // FIXME
        return null;
      }
      // Clean
      this.LDAPStates.clear();
      this.maxCNs.clear();
      // Init the maxCNs of our direct LDAP servers from our own dbstate
      for (ServerHandler rs : connectedServers.values())
      wrkMonitorData = new MonitorData();
      synchronized(wrkMonitorData)
      {
        short serverID = rs.getServerId();
        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
        if (cn == null)
        {
          // we have nothing in db for that server
          cn = new ChangeNumber(0, 0 , serverID);
        }
        this.maxCNs.put(serverID, cn);
      }
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Computing monitor data ");
      ServerState replServerState = this.getDbServerState();
      Iterator<Short> it = replServerState.iterator();
      while (it.hasNext())
      {
        short sid = it.next();
        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
        ChangeNumber maxCN = this.maxCNs.get(sid);
        if ((maxCN != null) && (receivedCN.newer(maxCN)))
        // Let's process our directly connected LSes
        // - in the ServerHandler for a given LS1, the stored state contains :
        //   - the max CN produced by LS1
        //   - the last CN consumed by LS1 from LS2..n
        // - in the RSdomain/dbHandler, the built-in state contains :
        //   - the max CN produced by each server
        // So for a given LS connected we can take the state and the max from
        // the LS/state.
        for (ServerHandler directlsh : connectedServers.values())
        {
          // We found a newer one
          this.maxCNs.remove(sid);
          this.maxCNs.put(sid, receivedCN);
          short serverID = directlsh.getServerId();
          // the state comes from the state stored in the SH
          ServerState directlshState = directlsh.getServerState().duplicate();
          // the max CN sent by that LS also comes from the SH
          ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
          if (maxcn == null)
          {
            // This directly connected LS has never produced any change
            maxcn = new ChangeNumber(0, 0 , serverID);
          }
          wrkMonitorData.setMaxCN(serverID, maxcn);
          wrkMonitorData.setLDAPServerState(serverID, directlshState);
          wrkMonitorData.setFirstMissingDate(serverID, directlsh.
                                             getApproxFirstMissingDate());
        }
        // Then initialize the max CN for the LS that produced something
        // - from our own local db state
        // - whatever they are directly or undirectly connected
        ServerState dbServerState = getDbServerState();
        Iterator<Short> it = dbServerState.iterator();
        while (it.hasNext())
        {
          short sid = it.next();
          ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
          wrkMonitorData.setMaxCN(sid, storedCN);
        }
        // Now we have used all available local informations
        // and we need the remote ones.
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Local monitor data: " +
            wrkMonitorData.toString());
      }
      // Send Request to the other Replication Servers
      if (remoteMonitorResponsesSemaphore == null)
      {
        remoteMonitorResponsesSemaphore = new Semaphore(
            replicationServers.size() -1);
        sendMonitorDataRequest();
        remoteMonitorResponsesSemaphore = new Semaphore(0);
        short requestCnt = sendMonitorDataRequest();
        // Wait reponses from them or timeout
        waitMonitorDataResponses(replicationServers.size());
        waitMonitorDataResponses(requestCnt);
      }
      else
      {
        // The processing of renewing the monitor cache is already running
        // We'll make it sleeping until the end
        // TODO: unit test for this case.
        while (remoteMonitorResponsesSemaphore!=null)
        {
          waitMonitorDataResponses(1);
        }
      }
      // Now we have the expected answers of an error occured
      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
      wrkMonitorData.completeComputing();
      if (debugEnabled())
      // Store the new computed data as the reference
      synchronized(monitorData)
      {
        debugMonitorData();
      }
    }
    private void debugMonitorData()
    {
      String mds = " Monitor data=";
      Iterator<Short> ite = LDAPStates.keySet().iterator();
      while (ite.hasNext())
      {
        Short sid = ite.next();
        ServerState ss = LDAPStates.get(sid);
        mds += " LDAPState(" + sid + ")=" + ss.toString();
      }
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber cn = maxCNs.get(sid);
        mds += " maxCNs(" + sid + ")=" + cn.toString();
      }
      mds += "--";
      TRACER.debugInfo(
        // Now we have the expected answers or an error occured
        monitorData = wrkMonitorData;
        wrkMonitorData = null;
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          mds);
          " baseDn=" + baseDn + " *** Computed MonitorData: " +
          monitorData.toString());
      }
      return monitorData;
    }
    /**
     * Sends a MonitorRequest message to all connected RS.
     * @return the number of requests sent.
     * @throws DirectoryException when a problem occurs.
     */
    protected void sendMonitorDataRequest()
    protected short sendMonitorDataRequest()
      throws DirectoryException
    {
      short sent=0;
      try
      {
        for (ServerHandler rs : replicationServers.values())
@@ -1425,6 +1445,7 @@
            MonitorRequestMessage(this.replicationServer.getServerId(),
              rs.getServerId());
          rs.send(msg);
          sent++;
        }
      }
      catch(Exception e)
@@ -1434,6 +1455,7 @@
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
      return sent;
    }
    /**
@@ -1446,21 +1468,30 @@
    {
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn +
          " waiting for " + expectedResponses
          + " expected monitor messages");
        boolean allPermitsAcquired =
          remoteMonitorResponsesSemaphore.tryAcquire(
              expectedResponses,
              (long) 500, TimeUnit.MILLISECONDS);
              (long) 5000, TimeUnit.MILLISECONDS);
        if (!allPermitsAcquired)
        {
          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
          // FIXME let's go on in best effort even with limited data received.
        }
        else
        {
          if (debugEnabled())
            TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Successfully received all " + replicationServers.size()
            " baseDn=" + baseDn +
            " Successfully received all " + expectedResponses
            + " expected monitor messages");
        }
      }
@@ -1482,48 +1513,94 @@
     */
    public void receivesMonitorDataResponse(MonitorMessage msg)
    {
      if (debugEnabled())
        TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        "Receiving " + msg + " from " + msg.getsenderID() +
        remoteMonitorResponsesSemaphore);
      if (remoteMonitorResponsesSemaphore == null)
      {
        // Ignoring the remote monitor data because an error occured previously
        // FIXME
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Receiving " + msg + " from " + msg.getsenderID() +
            " remoteMonitorResponsesSemaphore should not be null"));
        // Ignoring the remote monitor data because an error occured
        // previously
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN accross the RSes
        ServerState replServerState = msg.getReplServerState();
        Iterator<Short> it = replServerState.iterator();
        while (it.hasNext())
        synchronized(wrkMonitorData)
        {
          short sid = it.next();
          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
          ChangeNumber maxCN = this.maxCNs.get(sid);
          if (receivedCN.newer(maxCN))
          {
            // We found a newer one
            this.maxCNs.remove(sid);
            this.maxCNs.put(sid, receivedCN);
          }
        }
          // Here is the RS state : list <serverID, lastChangeNumber>
          // For each LDAP Server, we keep the max CN accross the RSes
          ServerState replServerState = msg.getReplServerDbState();
          wrkMonitorData.setMaxCNs(replServerState);
        // Store the LDAP servers states
        Iterator<Short> sidIterator = msg.iterator();
        while (sidIterator.hasNext())
        {
          short sid = sidIterator.next();
          ServerState ss = msg.getLDAPServerState(sid);
          this.LDAPStates.put(sid, ss);
          this.approxFirstMissingDate.put(sid,
              msg.getApproxFirstMissingDate(sid));
          // Store the remote LDAP servers states
          Iterator<Short> lsidIterator = msg.ldapIterator();
          while (lsidIterator.hasNext())
          {
            short sid = lsidIterator.next();
            wrkMonitorData.setLDAPServerState(sid,
                msg.getLDAPServerState(sid).duplicate());
            wrkMonitorData.setFirstMissingDate(sid,
                msg.getLDAPApproxFirstMissingDate(sid));
          }
          // Process the latency reported by the remote RSi on its connections
          // to the other RSes
          Iterator<Short> rsidIterator = msg.rsIterator();
          while (rsidIterator.hasNext())
          {
            short rsid = rsidIterator.next();
            if (rsid == replicationServer.getServerId())
            {
              // this is the latency of the remote RSi regarding the current RS
              // let's update the fmd of my connected LS
              for (ServerHandler connectedlsh : connectedServers.values())
              {
                short connectedlsid = connectedlsh.getServerId();
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
              }
            }
            else
            {
              // this is the latency of the remote RSi regarding another RSj
              // let's update the latency of the LSes connected to RSj
              ServerHandler rsjHdr = replicationServers.get(rsid);
              for(short remotelsid : rsjHdr.getConnectedServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
              }
            }
          }
          if (debugEnabled())
          {
            if (debugEnabled())
              TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " baseDn=" + baseDn +
              " Processed msg from " + msg.getsenderID() +
              " New monitor data: " + wrkMonitorData.toString());
          }
        }
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        remoteMonitorResponsesSemaphore.release();
      }
      catch (Exception e)
      {
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
            stackTraceToSingleLineString(e)));
        // If an exception occurs while processing one of the expected message,
        // the processing is aborted and the waiting thread is awoke.
        remoteMonitorResponsesSemaphore.notifyAll();
@@ -1531,65 +1608,6 @@
    }
    /**
     * Get the state of the LDAP server with the provided serverId.
     * @param serverId The server ID.
     * @return The server state.
     */
    public ServerState getServerState(short serverId)
    {
      return LDAPStates.get(serverId);
    }
    /**
     * Get the highest know change number of the LDAP server with the provided
     * serverId.
     * @param serverId The server ID.
     * @return The highest change number.
     */
    public ChangeNumber getMaxCN(short serverId)
    {
      return maxCNs.get(serverId);
    }
    /**
     * Get an approximation of the date of the oldest missing changes.
     * serverId.
     * @param serverId The server ID.
     * @return The approximation of the date of the oldest missing change.
     */
    public Long getApproxFirstMissingDate(short serverId)
    {
      return approxFirstMissingDate.get(serverId);
    }
    /**
     * Get the number of missing change for the server with the provided state.
     * @param state The provided server state.
     * @return The number of missing changes.
     */
    public int getMissingChanges(ServerState state)
    {
      // Traverse the max Cn transmitted by each server
      // For each server, get the highest CN know from the current server
      // Sum the difference betwenn the max and the last
      int missingChanges = 0;
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber maxCN = maxCNs.get(sid);
        ChangeNumber last = state.getMaxChangeNumber(sid);
        if (last == null)
        {
          last = new ChangeNumber(0,0, sid);
        }
        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
        missingChanges += missingChangesFromSID;
      }
      return missingChanges;
    }
    /**
     * Set the purge delay on all the db Handlers for this Domain
     * of Replicaiton.
     *