From 2942eaa1b7264228c9ca7535aabd206e663581e9 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 03 Jan 2008 09:41:49 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |  235 ++++++++++++++++++++++++++++++++++++++++------------------
 1 files changed, 163 insertions(+), 72 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 2c4e3ba..cd499e3 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
- */
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License").  You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- *      Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -62,9 +36,9 @@
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -150,11 +124,12 @@
 
 
   /**
-   * When this Handler is connected to a changelog server this collection
-   * will contain the list of LDAP servers connected to the remote changelog
-   * server.
+   * When this Handler is connected to a remote replication server
+   * this collection will contain as many elements as there are
+   * LDAP servers connected to the remote replication server.
    */
-  private List<String> remoteLDAPservers = new ArrayList<String>();
+  private List<LightweightServerHandler>
+     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
 
   /**
    * The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
        ServerState dbState = replicationServerDomain.getDbServerState();
        for (short id : dbState)
        {
-         int max = dbState.getMaxChangeNumber(id).getSeqnum();
-         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
-         if (currentChange != null)
-         {
-           int current = currentChange.getSeqnum();
-           if (current == max)
-           {
-           }
-           else if (current < max)
-           {
-             totalCount += max - current;
-           }
-           else
-           {
-             totalCount += Integer.MAX_VALUE - (current - max) + 1;
-           }
-         }
-         else
-         {
-           totalCount += max;
-         }
+         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
+             serverState.getMaxChangeNumber(id));
        }
        return totalCount;
      }
@@ -858,7 +814,7 @@
   }
 
   /**
-   * Get an approximation of the delay by looking at the age of the odest
+   * Get an approximation of the delay by looking at the age of the oldest
    * message that has not been sent to this server.
    * This is an approximation because the age is calculated using the
    * clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
    * @return The age if the older change has not yet been replicated
    *         to the server handled by this ServerHandler.
    */
+  public Long getApproxFirstMissingDate()
+  {
+    // Get the older CN received
+    // From it, get the next sequence number
+    // Get the CN for the next sequence number
+    // If not present in the local RS db,
+    // then approximate with the older update time
+    ChangeNumber olderUpdateCN = getOlderUpdateCN();
+    if (olderUpdateCN == null)
+      return null;
+
+    ReplicationIterator ri =
+      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
+    if (ri != null)
+    {
+      if (ri.next())
+      {
+        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
+        return firstMissingChange.getTime();
+      }
+    }
+    return olderUpdateCN.getTime();
+  }
+
+  /**
+   * Get the older update time for that server.
+   * @return The older update time.
+   */
   public long getOlderUpdateTime()
   {
+    ChangeNumber olderUpdateCN = getOlderUpdateCN();
+    if (olderUpdateCN == null)
+      return 0;
+    return  olderUpdateCN.getTime();
+  }
+
+  /**
+   * Get the older Change Number for that server.
+   * @return The older change number.
+   */
+  public ChangeNumber getOlderUpdateCN()
+  {
     synchronized (msgQueue)
     {
       if (isFollowing())
       {
         if (msgQueue.isEmpty())
-          return 0;
+          return null;
 
         UpdateMessage msg = msgQueue.first();
-        return msg.getChangeNumber().getTime();
+        return msg.getChangeNumber();
       }
       else
       {
         if (lateQueue.isEmpty())
-          return 0;
+          return null;
 
         UpdateMessage msg = lateQueue.first();
-        return msg.getChangeNumber().getTime();
+        return msg.getChangeNumber();
       }
     }
   }
@@ -1190,6 +1186,16 @@
   }
 
   /**
+   * Get the state of this server.
+   *
+   * @return ServerState the state for this server..
+   */
+  public ServerState getServerState()
+  {
+    return serverState;
+  }
+
+  /**
    * Stop this server handler processing.
    */
   public void stopHandler()
@@ -1397,7 +1403,7 @@
                  " " + serverURL + " " + String.valueOf(serverId);
 
     if (serverIsLDAPserver)
-      return "Remote LDAP Server " + str;
+      return "Direct LDAP Server " + str;
     else
       return "Remote Repl Server " + str;
   }
@@ -1445,28 +1451,68 @@
   {
     ArrayList<Attribute> attributes = new ArrayList<Attribute>();
     if (serverIsLDAPserver)
+    {
       attributes.add(new Attribute("LDAP-Server", serverURL));
+      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
+          getReplicationServer().getMonitorInstanceName()));
+
+      // Add the oldest missing update
+      Long olderUpdateTime = this.getApproxFirstMissingDate();
+      if (olderUpdateTime != null)
+      {
+        Date date = new Date(olderUpdateTime);
+        attributes.add(new Attribute("approx-older-change-not-synchronized",
+          date.toString()));
+      }
+    }
     else
+    {
       attributes.add(new Attribute("ReplicationServer-Server", serverURL));
+    }
     attributes.add(new Attribute("server-id",
                                  String.valueOf(serverId)));
     attributes.add(new Attribute("base-dn",
                                  baseDn.toString()));
-    attributes.add(new Attribute("waiting-changes",
-                                 String.valueOf(getRcvMsgQueueSize())));
-    attributes.add(new Attribute("max-waiting-changes",
-                                 String.valueOf(maxQueueSize)));
-    attributes.add(new Attribute("update-waiting-acks",
-                                 String.valueOf(getWaitingAckSize())));
+
+    // Update stats
+
+    // Retrieves the topology counters
+    if (serverIsLDAPserver)
+    {
+      try
+      {
+        replicationServerDomain.retrievesRemoteMonitorData();
+      }
+      catch(Exception e)
+      {
+        // FIXME: We failed retrieving the remote monitor data
+      }
+
+      // Compute the latency for the current SH
+      int missingChanges =
+        replicationServerDomain.getMissingChanges(serverState);
+
+      // add the latency attribute to our monitor data
+      attributes.add(new Attribute("missing-changes",
+          String.valueOf(missingChanges)));
+    }
+
+    // Deprecated
+    // attributes.add(new Attribute("max-waiting-changes",
+    //                              String.valueOf(maxQueueSize)));
     attributes.add(new Attribute("update-sent",
                                  String.valueOf(getOutCount())));
     attributes.add(new Attribute("update-received",
                                  String.valueOf(getInCount())));
+
+    // Deprecated as long as assured is not exposed
+    attributes.add(new Attribute("update-waiting-acks",
+        String.valueOf(getWaitingAckSize())));
     attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
     attributes.add(new Attribute("ack-received",
                                  String.valueOf(getInAckCount())));
-    attributes.add(new Attribute("approximate-delay",
-                                 String.valueOf(getApproxDelay())));
+
+    // Window stats
     attributes.add(new Attribute("max-send-window",
                                  String.valueOf(sendWindowSize)));
     attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
                                  String.valueOf(maxRcvWindow)));
     attributes.add(new Attribute("current-rcv-window",
                                  String.valueOf(rcvWindow)));
+
+    /*
+     * FIXME:PGB DEPRECATED
+     *
+    // Missing changes
+    attributes.add(new Attribute("waiting-changes",
+        String.valueOf(getRcvMsgQueueSize())));
+    // Age of oldest missing change
+    attributes.add(new Attribute("approximate-delay",
+                                 String.valueOf(getApproxDelay())));
+
+    // Date of the oldest missing change
     long olderUpdateTime = getOlderUpdateTime();
     if (olderUpdateTime != 0)
     {
@@ -1482,6 +1540,7 @@
       attributes.add(new Attribute("older-change-not-synchronized",
                                  String.valueOf(date.toString())));
     }
+    */
 
     /* get the Server State */
     final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
     Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
     attributes.add(attr);
 
+    // Encryption
     attributes.add(new Attribute("ssl-encryption",
         String.valueOf(session.isEncrypted())));
 
+    // Data generation
     attributes.add(new Attribute("generation-id",
         String.valueOf(generationId)));
 
@@ -1663,8 +1724,28 @@
            getMonitorInstanceName() +
            " SH for remote server " + this.getMonitorInstanceName() +
            " sets replServerInfo " + "<" + infoMsg + ">");
-     remoteLDAPservers = infoMsg.getConnectedServers();
+
+     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
      generationId = infoMsg.getGenerationId();
+
+     synchronized(remoteLDAPservers)
+     {
+       // Removes the existing structures
+       for (LightweightServerHandler lsh : remoteLDAPservers)
+       {
+         lsh.stopHandler();
+       }
+       remoteLDAPservers.clear();
+
+       // Creates the new structure according to the message received.
+       for (String newConnectedServer : newRemoteLDAPservers)
+       {
+         LightweightServerHandler lsh
+         = new LightweightServerHandler(newConnectedServer, this);
+         lsh.startHandler();
+         remoteLDAPservers.add(lsh);
+       }
+     }
    }
 
    /**
@@ -1678,9 +1759,9 @@
     */
    public boolean isRemoteLDAPServer(short wantedServer)
    {
-     for (String server : remoteLDAPservers)
+     for (LightweightServerHandler server : remoteLDAPservers)
      {
-       if (wantedServer == Short.valueOf(server))
+       if (wantedServer == server.getServerId())
        {
          return true;
        }
@@ -1695,9 +1776,9 @@
     * @return boolean True is the replication server has remote LDAP servers
     * connected to it.
     */
-   public List<String> getRemoteLDAPServers()
+   public boolean hasRemoteLDAPServers()
    {
-     return remoteLDAPservers;
+     return !remoteLDAPservers.isEmpty();
    }
 
   /**
@@ -1802,4 +1883,14 @@
   {
     this.generationId = generationId;
   }
+
+  /**
+   * Returns the Replication Server Domain to which belongs this server handler.
+   *
+   * @return The replication server domain.
+   */
+  public ReplicationServerDomain getDomain()
+  {
+    return this.replicationServerDomain;
+  }
 }

--
Gitblit v1.10.0