mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
03.41.2008 2942eaa1b7264228c9ca7535aabd206e663581e9
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -62,9 +36,9 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -150,11 +124,12 @@
  /**
   * When this Handler is connected to a changelog server this collection
   * will contain the list of LDAP servers connected to the remote changelog
   * server.
   * When this Handler is connected to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   */
  private List<String> remoteLDAPservers = new ArrayList<String>();
  private List<LightweightServerHandler>
     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
       ServerState dbState = replicationServerDomain.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
         if (currentChange != null)
         {
           int current = currentChange.getSeqnum();
           if (current == max)
           {
           }
           else if (current < max)
           {
             totalCount += max - current;
           }
           else
           {
             totalCount += Integer.MAX_VALUE - (current - max) + 1;
           }
         }
         else
         {
           totalCount += max;
         }
         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
             serverState.getMaxChangeNumber(id));
       }
       return totalCount;
     }
@@ -858,7 +814,7 @@
  }
  /**
   * Get an approximation of the delay by looking at the age of the odest
   * Get an approximation of the delay by looking at the age of the oldest
   * message that has not been sent to this server.
   * This is an approximation because the age is calculated using the
   * clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
   * @return The age if the older change has not yet been replicated
   *         to the server handled by this ServerHandler.
   */
  public Long getApproxFirstMissingDate()
  {
    // Get the older CN received
    // From it, get the next sequence number
    // Get the CN for the next sequence number
    // If not present in the local RS db,
    // then approximate with the older update time
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return null;
    ReplicationIterator ri =
      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
    if (ri != null)
    {
      if (ri.next())
      {
        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
        return firstMissingChange.getTime();
      }
    }
    return olderUpdateCN.getTime();
  }
  /**
   * Get the older update time for that server.
   * @return The older update time.
   */
  public long getOlderUpdateTime()
  {
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return 0;
    return  olderUpdateCN.getTime();
  }
  /**
   * Get the older Change Number for that server.
   * @return The older change number.
   */
  public ChangeNumber getOlderUpdateCN()
  {
    synchronized (msgQueue)
    {
      if (isFollowing())
      {
        if (msgQueue.isEmpty())
          return 0;
          return null;
        UpdateMessage msg = msgQueue.first();
        return msg.getChangeNumber().getTime();
        return msg.getChangeNumber();
      }
      else
      {
        if (lateQueue.isEmpty())
          return 0;
          return null;
        UpdateMessage msg = lateQueue.first();
        return msg.getChangeNumber().getTime();
        return msg.getChangeNumber();
      }
    }
  }
@@ -1190,6 +1186,16 @@
  }
  /**
   * Get the state of this server.
   *
   * @return ServerState the state for this server..
   */
  public ServerState getServerState()
  {
    return serverState;
  }
  /**
   * Stop this server handler processing.
   */
  public void stopHandler()
@@ -1397,7 +1403,7 @@
                 " " + serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
      return "Remote LDAP Server " + str;
      return "Direct LDAP Server " + str;
    else
      return "Remote Repl Server " + str;
  }
@@ -1445,28 +1451,68 @@
  {
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    if (serverIsLDAPserver)
    {
      attributes.add(new Attribute("LDAP-Server", serverURL));
      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
          getReplicationServer().getMonitorInstanceName()));
      // Add the oldest missing update
      Long olderUpdateTime = this.getApproxFirstMissingDate();
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    else
    {
      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
    }
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("max-waiting-changes",
                                 String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-waiting-acks",
                                 String.valueOf(getWaitingAckSize())));
    // Update stats
    // Retrieves the topology counters
    if (serverIsLDAPserver)
    {
      try
      {
        replicationServerDomain.retrievesRemoteMonitorData();
      }
      catch(Exception e)
      {
        // FIXME: We failed retrieving the remote monitor data
      }
      // Compute the latency for the current SH
      int missingChanges =
        replicationServerDomain.getMissingChanges(serverState);
      // add the latency attribute to our monitor data
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
    }
    // Deprecated
    // attributes.add(new Attribute("max-waiting-changes",
    //                              String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-sent",
                                 String.valueOf(getOutCount())));
    attributes.add(new Attribute("update-received",
                                 String.valueOf(getInCount())));
    // Deprecated as long as assured is not exposed
    attributes.add(new Attribute("update-waiting-acks",
        String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
    attributes.add(new Attribute("ack-received",
                                 String.valueOf(getInAckCount())));
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Window stats
    attributes.add(new Attribute("max-send-window",
                                 String.valueOf(sendWindowSize)));
    attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
                                 String.valueOf(maxRcvWindow)));
    attributes.add(new Attribute("current-rcv-window",
                                 String.valueOf(rcvWindow)));
    /*
     * FIXME:PGB DEPRECATED
     *
    // Missing changes
    attributes.add(new Attribute("waiting-changes",
        String.valueOf(getRcvMsgQueueSize())));
    // Age of oldest missing change
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Date of the oldest missing change
    long olderUpdateTime = getOlderUpdateTime();
    if (olderUpdateTime != 0)
    {
@@ -1482,6 +1540,7 @@
      attributes.add(new Attribute("older-change-not-synchronized",
                                 String.valueOf(date.toString())));
    }
    */
    /* get the Server State */
    final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
    Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
    attributes.add(attr);
    // Encryption
    attributes.add(new Attribute("ssl-encryption",
        String.valueOf(session.isEncrypted())));
    // Data generation
    attributes.add(new Attribute("generation-id",
        String.valueOf(generationId)));
@@ -1663,8 +1724,28 @@
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
     remoteLDAPservers = infoMsg.getConnectedServers();
     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
     synchronized(remoteLDAPservers)
     {
       // Removes the existing structures
       for (LightweightServerHandler lsh : remoteLDAPservers)
       {
         lsh.stopHandler();
       }
       remoteLDAPservers.clear();
       // Creates the new structure according to the message received.
       for (String newConnectedServer : newRemoteLDAPservers)
       {
         LightweightServerHandler lsh
         = new LightweightServerHandler(newConnectedServer, this);
         lsh.startHandler();
         remoteLDAPservers.add(lsh);
       }
     }
   }
   /**
@@ -1678,9 +1759,9 @@
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (String server : remoteLDAPservers)
     for (LightweightServerHandler server : remoteLDAPservers)
     {
       if (wantedServer == Short.valueOf(server))
       if (wantedServer == server.getServerId())
       {
         return true;
       }
@@ -1695,9 +1776,9 @@
    * @return boolean True is the replication server has remote LDAP servers
    * connected to it.
    */
   public List<String> getRemoteLDAPServers()
   public boolean hasRemoteLDAPServers()
   {
     return remoteLDAPservers;
     return !remoteLDAPservers.isEmpty();
   }
  /**
@@ -1802,4 +1883,14 @@
  {
    this.generationId = generationId;
  }
  /**
   * Returns the Replication Server Domain to which belongs this server handler.
   *
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain()
  {
    return this.replicationServerDomain;
  }
}