mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
01.21.2008 05d24dcca61eed7921987a98bb94d94a4aa030cd
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -124,12 +125,12 @@
  /**
   * When this Handler is connected to a remote replication server
   * When this Handler is related to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   */
  private List<LightweightServerHandler>
     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
  private Map<Short, LightweightServerHandler> connectedServers =
    new ConcurrentHashMap<Short, LightweightServerHandler>();
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -200,6 +201,8 @@
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    long localGenerationId = -1;
    boolean handshakeOnly = false;
    try
    {
      if (baseDn != null)
@@ -244,6 +247,8 @@
        maxSendQueue = receivedMsg.getMaxSendQueue();
        heartbeatInterval = receivedMsg.getHeartbeatInterval();
        handshakeOnly = receivedMsg.isHandshakeOnly();
        // The session initiator decides whether to use SSL.
        sslEncryption = receivedMsg.getSSLEncryption();
@@ -524,60 +529,70 @@
      replicationServerDomain = replicationServer.
              getReplicationServerDomain(this.baseDn,true);
      boolean started;
      if (serverIsLDAPserver)
      if (!handshakeOnly)
      {
        started = replicationServerDomain.startServer(this);
      }
      else
      {
        started = replicationServerDomain.startReplicationServer(this);
      }
      if (started)
      {
        // sendWindow MUST be created before starting the writer
        sendWindow = new Semaphore(sendWindowSize);
        writer = new ServerWriter(session, serverId,
                this, replicationServerDomain);
        reader = new ServerReader(session, serverId,
                this, replicationServerDomain);
        reader.start();
        writer.start();
        // Create a thread to send heartbeat messages.
        if (heartbeatInterval > 0)
        boolean started;
        if (serverIsLDAPserver)
        {
          heartbeatThread = new HeartbeatThread(
              "replication Heartbeat to " + serverURL +
              " for " + this.baseDn,
              session, heartbeatInterval/3);
          heartbeatThread.start();
          started = replicationServerDomain.startServer(this);
        }
        else
        {
          started = replicationServerDomain.startReplicationServer(this);
        }
        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
        DirectoryServer.registerMonitorProvider(this);
      }
      else
      {
        // the connection is not valid, close it.
        try
        if (started)
        {
          if (debugEnabled())
          // sendWindow MUST be created before starting the writer
          sendWindow = new Semaphore(sendWindowSize);
          writer = new ServerWriter(session, serverId,
              this, replicationServerDomain);
          reader = new ServerReader(session, serverId,
              this, replicationServerDomain);
          reader.start();
          writer.start();
          // Create a thread to send heartbeat messages.
          if (heartbeatInterval > 0)
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS failed to start locally " +
              " the connection from serverID="+serverId);
            heartbeatThread = new HeartbeatThread(
                "replication Heartbeat to " + serverURL +
                " for " + this.baseDn,
                session, heartbeatInterval/3);
            heartbeatThread.start();
          }
          session.close();
        } catch (IOException e1)
        {
          // ignore
          DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
          DirectoryServer.registerMonitorProvider(this);
        }
        else
        {
          // the connection is not valid, close it.
          try
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                  replicationServerDomain.getReplicationServer().
                  getMonitorInstanceName() + " RS failed to start locally " +
                  " the connection from serverID="+serverId);
            }
            session.close();
          } catch (IOException e1)
          {
            // ignore
          }
        }
      }
      else
      {
        // For a hanshakeOnly connection, let's only create a reader
        // in order to detect the connection closure.
        reader = new ServerReader(session, serverId,
            this, replicationServerDomain);
        reader.start();
      }
    }
    catch (Exception e)
@@ -842,22 +857,22 @@
  /**
   * Get the age of the older change that has not yet been replicated
   * to the server handled by this ServerHandler.
   *
   * @return The age if the older change has not yet been replicated
   *         to the server handled by this ServerHandler.
   */
  public Long getApproxFirstMissingDate()
  {
    // Get the older CN received
    // From it, get the next sequence number
    // Get the CN for the next sequence number
    // If not present in the local RS db,
    // then approximate with the older update time
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return null;
    Long result = (long)0;
    return olderUpdateCN.getTime();
    // Get the older CN received
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN != null)
    {
      // If not present in the local RS db,
      // then approximate with the older update time
      result=olderUpdateCN.getTime();
    }
    return result;
  }
  /**
@@ -874,29 +889,82 @@
  /**
   * Get the older Change Number for that server.
   * Returns null when the queue is empty.
   * @return The older change number.
   */
  public ChangeNumber getOlderUpdateCN()
  {
    ChangeNumber result = null;
    synchronized (msgQueue)
    {
      if (isFollowing())
      {
        if (msgQueue.isEmpty())
          return null;
        UpdateMessage msg = msgQueue.first();
        return msg.getChangeNumber();
        {
          result=null;
        }
        else
        {
          UpdateMessage msg = msgQueue.first();
          result = msg.getChangeNumber();
        }
      }
      else
      {
        if (lateQueue.isEmpty())
          return null;
        {
          // isFollowing is false AND lateQueue is empty
          // We may be at the very moment when the writer has emptyed the
          // lateQueue when it sent the last update. The writer will fill again
          // the lateQueue when it will send the next update but we are not yet
          // there. So let's take the last change not sent directly from
          // the db.
        UpdateMessage msg = lateQueue.first();
        return msg.getChangeNumber();
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          try
          {
            // Build a list of candidates iterator (i.e. db i.e. server)
            for (short serverId : replicationServerDomain.getServers())
            {
              // get the last already sent CN from that server
              ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
              // get an iterator in this server db from that last change
              ReplicationIterator iterator =
                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
              // if that iterator has changes, then it is a candidate
              // it is added in the sorted list at a position given by its
              // current change (see ReplicationIteratorComparator).
              if ((iterator != null) && (iterator.getChange() != null))
              {
                iteratorSortedSet.add(iterator);
              }
            }
            UpdateMessage msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          }
          catch(Exception e)
          {
            result=null;
          }
          finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
          }
        }
        else
        {
          UpdateMessage msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
      }
    }
    return result;
  }
  /**
@@ -958,7 +1026,7 @@
       */
      while (msgQueue.size() > maxQueueSize)
      {
        following = false;
        setFollowing(false);
        msgQueue.removeFirst();
      }
    }
@@ -1083,6 +1151,13 @@
              }
            }
          }
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change accross all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
@@ -1107,7 +1182,7 @@
            {
              if (msgQueue.size() < maxQueueSize)
              {
                following = true;
                setFollowing(true);
              }
            }
          }
@@ -1119,7 +1194,7 @@
              if (msgQueue.contains(msg))
              {
                /* we finally catched up with the regular queue */
                following = true;
                setFollowing(true);
                lateQueue.clear();
                UpdateMessage msg1;
                do
@@ -1459,14 +1534,6 @@
      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
          getReplicationServer().getMonitorInstanceName()));
      // Add the oldest missing update
      Long olderUpdateTime = this.getApproxFirstMissingDate();
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    else
    {
@@ -1477,27 +1544,42 @@
    attributes.add(new Attribute("base-dn",
                                 baseDn.toString()));
    // Update stats
    // Retrieves the topology counters
    if (serverIsLDAPserver)
    {
      MonitorData md;
      try
      {
        replicationServerDomain.retrievesRemoteMonitorData();
        md = replicationServerDomain.getMonitorData();
        // Oldest missing update
        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
        if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
        {
          Date date = new Date(approxFirstMissingDate);
          attributes.add(new Attribute("approx-older-change-not-synchronized",
              date.toString()));
          attributes.add(
              new Attribute("approx-older-change-not-synchronized-millis",
                  String.valueOf(approxFirstMissingDate)));
        }
        // Missing changes
        long missingChanges = md.getMissingChanges(serverId);
        attributes.add(new Attribute("missing-changes",
            String.valueOf(missingChanges)));
        // Replication delay
        long delay = md.getApproxDelay(serverId);
        attributes.add(new Attribute("approximate-delay",
            String.valueOf(delay)));
      }
      catch(Exception e)
      {
        // FIXME: We failed retrieving the remote monitor data
        // TODO: improve the log
        // We failed retrieving the remote monitor data.
        attributes.add(new Attribute("error",
            stackTraceToSingleLineString(e)));
      }
      // Compute the latency for the current SH
      int missingChanges =
        replicationServerDomain.getMissingChanges(serverState);
      // add the latency attribute to our monitor data
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
    }
    // Deprecated
@@ -1532,8 +1614,6 @@
    attributes.add(new Attribute("waiting-changes",
        String.valueOf(getRcvMsgQueueSize())));
    // Age of oldest missing change
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Date of the oldest missing change
    long olderUpdateTime = getOlderUpdateTime();
@@ -1731,14 +1811,14 @@
     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
     synchronized(remoteLDAPservers)
     synchronized(connectedServers)
     {
       // Removes the existing structures
       for (LightweightServerHandler lsh : remoteLDAPservers)
       for (LightweightServerHandler lsh : connectedServers.values())
       {
         lsh.stopHandler();
       }
       remoteLDAPservers.clear();
       connectedServers.clear();
       // Creates the new structure according to the message received.
       for (String newConnectedServer : newRemoteLDAPservers)
@@ -1746,7 +1826,7 @@
         LightweightServerHandler lsh
         = new LightweightServerHandler(newConnectedServer, this);
         lsh.startHandler();
         remoteLDAPservers.add(lsh);
         connectedServers.put(lsh.getServerId(), lsh);
       }
     }
   }
@@ -1762,14 +1842,17 @@
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (LightweightServerHandler server : remoteLDAPservers)
     synchronized(connectedServers)
     {
       if (wantedServer == server.getServerId())
       for (LightweightServerHandler server : connectedServers.values())
       {
         return true;
         if (wantedServer == server.getServerId())
         {
           return true;
         }
       }
       return false;
     }
     return false;
   }
   /**
@@ -1781,7 +1864,7 @@
    */
   public boolean hasRemoteLDAPServers()
   {
     return !remoteLDAPservers.isEmpty();
     return !connectedServers.isEmpty();
   }
  /**
@@ -1907,4 +1990,13 @@
  {
    return this.replicationServerDomain;
  }
  /**
   * Return a Set containing the servers known by this replicationServer.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getConnectedServerIds()
  {
    return connectedServers.keySet();
  }
}