From 2942eaa1b7264228c9ca7535aabd206e663581e9 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 03 Jan 2008 09:41:49 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  379 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 372 insertions(+), 7 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 7eadd3e..6382ca3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
 import org.opends.server.replication.protocol.RoutableMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.MonitorMessage;
+import org.opends.server.replication.protocol.MonitorRequestMessage;
 import org.opends.server.replication.protocol.ResetGenerationId;
 import org.opends.server.types.DN;
-
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
 
 /**
@@ -118,6 +128,34 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /* Monitor data management */
+
+  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
+  private long remoteMonitorDataLifeTime = 500;
+
+  /* Search op on monitor data is processed by a worker thread.
+   * Requests are sent to the other RS,and responses are received by the
+   * listener threads.
+   * The worker thread is awoke on this semaphore, or on timeout.
+   */
+  Semaphore remoteMonitorResponsesSemaphore;
+
+  /* The date of the last time they have been elaborated */
+  private long validityDate = 0;
+
+  // For each LDAP server, its server state
+  private HashMap<Short, ServerState> LDAPStates =
+    new HashMap<Short, ServerState>();
+
+  // For each LDAP server, the last CN it published
+  private HashMap<Short, ChangeNumber> maxCNs =
+    new HashMap<Short, ChangeNumber>();
+
+  // For each LDAP server, an approximation of the date of the first missing
+  // change
+  private HashMap<Short, Long> approxFirstMissingDate =
+    new HashMap<Short, Long>();
+
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
    *
@@ -352,7 +390,7 @@
         }
         else
         {
-          if (!rsh.getRemoteLDAPServers().isEmpty())
+          if (rsh.hasRemoteLDAPServers())
           {
             lDAPServersConnectedInTheTopology = true;
 
@@ -636,7 +674,7 @@
         // server connected
         for (ServerHandler rsh : replicationServers.values())
         {
-          if (!rsh.getRemoteLDAPServers().isEmpty())
+          if (rsh.hasRemoteLDAPServers())
           {
             servers.add(rsh);
           }
@@ -693,15 +731,58 @@
    */
   public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
-    // A replication server is not expected to be the destination
-    // of a routable message except for an error message.
+    // Test the message for which a ReplicationServer is expected
+    // to be the destination
     if (msg.getDestination() == this.replicationServer.getServerId())
     {
       if (msg instanceof ErrorMessage)
       {
         ErrorMessage errorMsg = (ErrorMessage)msg;
         logError(ERR_ERROR_MSG_RECEIVED.get(
-                   errorMsg.getDetails()));
+            errorMsg.getDetails()));
+      }
+      else if (msg instanceof MonitorRequestMessage)
+      {
+        MonitorRequestMessage replServerMonitorRequestMsg =
+          (MonitorRequestMessage) msg;
+
+        MonitorMessage monitorMsg =
+          new MonitorMessage(
+              replServerMonitorRequestMsg.getDestination(),
+              replServerMonitorRequestMsg.getsenderID());
+
+        // Populate the RS state in the msg from the DbState
+        monitorMsg.setReplServerState(this.getDbServerState());
+
+        // Populate for each connected LDAP Server
+        // from the states stored in the serverHandler.
+        // - the server state
+        // - the older missing change
+        for (ServerHandler lsh : this.connectedServers.values())
+        {
+          monitorMsg.setLDAPServerState(
+              lsh.getServerId(),
+              lsh.getServerState(),
+              lsh.getApproxFirstMissingDate());
+        }
+        try
+        {
+          senderHandler.send(monitorMsg);
+        }
+        catch(Exception e)
+        {
+          // We log the error. The requestor will detect a timeout or
+          // any other failure on the connection.
+          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+              Short.toString((msg.getDestination()))));
+        }
+      }
+      else if (msg instanceof MonitorMessage)
+      {
+        MonitorMessage monitorMsg =
+          (MonitorMessage) msg;
+
+        receivesMonitorDataResponse(monitorMsg);
       }
       else
       {
@@ -1156,4 +1237,288 @@
     {
       return replicationServer;
     }
+
+    /*
+     * Monitor Data generation
+     */
+
+    /**
+     * Retrieves the remote monitor data.
+     *
+     * @throws DirectoryException When an error occurs.
+     */
+    protected void retrievesRemoteMonitorData()
+      throws DirectoryException
+    {
+      if (validityDate > TimeThread.getTime())
+      {
+        // The current data are still valid. No need to renew them.
+        return;
+      }
+
+      // Clean
+      this.LDAPStates.clear();
+      this.maxCNs.clear();
+
+      // Init the maxCNs of our direct LDAP servers from our own dbstate
+      for (ServerHandler rs : connectedServers.values())
+      {
+        short serverID = rs.getServerId();
+        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
+        if (cn == null)
+        {
+          // we have nothing in db for that server
+          cn = new ChangeNumber(0, 0 , serverID);
+        }
+        this.maxCNs.put(serverID, cn);
+      }
+
+      ServerState replServerState = this.getDbServerState();
+      Iterator<Short> it = replServerState.iterator();
+      while (it.hasNext())
+      {
+        short sid = it.next();
+        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+        ChangeNumber maxCN = this.maxCNs.get(sid);
+        if ((maxCN != null) && (receivedCN.newer(maxCN)))
+        {
+          // We found a newer one
+          this.maxCNs.remove(sid);
+          this.maxCNs.put(sid, receivedCN);
+        }
+      }
+
+      // Send Request to the other Replication Servers
+      if (remoteMonitorResponsesSemaphore == null)
+      {
+        remoteMonitorResponsesSemaphore = new Semaphore(
+            replicationServers.size() -1);
+
+        sendMonitorDataRequest();
+
+        // Wait reponses from them or timeout
+        waitMonitorDataResponses(replicationServers.size());
+      }
+      else
+      {
+        // The processing of renewing the monitor cache is already running
+        // We'll make it sleeping until the end
+        while (remoteMonitorResponsesSemaphore!=null)
+        {
+          waitMonitorDataResponses(1);
+        }
+      }
+
+      // Now we have the expected answers of an error occured
+      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+
+      if (debugEnabled())
+      {
+        debugMonitorData();
+      }
+    }
+
+    private void debugMonitorData()
+    {
+      String mds = " Monitor data=";
+      Iterator<Short> ite = LDAPStates.keySet().iterator();
+      while (ite.hasNext())
+      {
+        Short sid = ite.next();
+        ServerState ss = LDAPStates.get(sid);
+        mds += " LDAPState(" + sid + ")=" + ss.toString();
+      }
+      Iterator<Short> itc = maxCNs.keySet().iterator();
+      while (itc.hasNext())
+      {
+        Short sid = itc.next();
+        ChangeNumber cn = maxCNs.get(sid);
+        mds += " maxCNs(" + sid + ")=" + cn.toString();
+      }
+
+      mds += "--";
+      TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDN=" + baseDn +
+          mds);
+    }
+
+    /**
+     * Sends a MonitorRequest message to all connected RS.
+     * @throws DirectoryException when a problem occurs.
+     */
+    protected void sendMonitorDataRequest()
+      throws DirectoryException
+    {
+      try
+      {
+        for (ServerHandler rs : replicationServers.values())
+        {
+          MonitorRequestMessage msg = new
+            MonitorRequestMessage(this.replicationServer.getServerId(),
+              rs.getServerId());
+          rs.send(msg);
+        }
+      }
+      catch(Exception e)
+      {
+        Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
+        logError(message);
+        throw new DirectoryException(ResultCode.OTHER,
+            message, e);
+      }
+    }
+
+    /**
+     * Wait for the expected count of received MonitorMessage.
+     * @param expectedResponses The number of expected answers.
+     * @throws DirectoryException When an error occurs.
+     */
+    protected void waitMonitorDataResponses(int expectedResponses)
+      throws DirectoryException
+    {
+      try
+      {
+        boolean allPermitsAcquired =
+          remoteMonitorResponsesSemaphore.tryAcquire(
+              expectedResponses,
+              (long) 500, TimeUnit.MILLISECONDS);
+
+        if (!allPermitsAcquired)
+        {
+          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+        }
+        else
+        {
+          if (debugEnabled())
+            TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            "Successfully received all " + replicationServers.size()
+            + " expected monitor messages");
+        }
+      }
+      catch(Exception e)
+      {
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+      }
+      finally
+      {
+        remoteMonitorResponsesSemaphore = null;
+      }
+    }
+
+    /**
+     * Processes a Monitor message receives from a remote Replication Server
+     * and stores the data received.
+     *
+     * @param msg The message to be processed.
+     */
+    public void receivesMonitorDataResponse(MonitorMessage msg)
+    {
+      if (remoteMonitorResponsesSemaphore == null)
+      {
+        // Ignoring the remote monitor data because an error occured previously
+        return;
+      }
+
+      try
+      {
+        // Here is the RS state : list <serverID, lastChangeNumber>
+        // For each LDAP Server, we keep the max CN accross the RSes
+        ServerState replServerState = msg.getReplServerState();
+        Iterator<Short> it = replServerState.iterator();
+        while (it.hasNext())
+        {
+          short sid = it.next();
+          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+          ChangeNumber maxCN = this.maxCNs.get(sid);
+          if (receivedCN.newer(maxCN))
+          {
+            // We found a newer one
+            this.maxCNs.remove(sid);
+            this.maxCNs.put(sid, receivedCN);
+          }
+        }
+
+        // Store the LDAP servers states
+        Iterator<Short> sidIterator = msg.iterator();
+        while (sidIterator.hasNext())
+        {
+          short sid = sidIterator.next();
+          ServerState ss = msg.getLDAPServerState(sid);
+          this.LDAPStates.put(sid, ss);
+          this.approxFirstMissingDate.put(sid,
+              msg.getApproxFirstMissingDate(sid));
+        }
+
+        // Decreases the number of expected responses and potentially
+        // wakes up the waiting requestor thread.
+        remoteMonitorResponsesSemaphore.release();
+      }
+      catch (Exception e)
+      {
+        // If an exception occurs while processing one of the expected message,
+        // the processing is aborted and the waiting thread is awoke.
+        remoteMonitorResponsesSemaphore.notifyAll();
+      }
+    }
+
+    /**
+     * Get the state of the LDAP server with the provided serverId.
+     * @param serverId The server ID.
+     * @return The server state.
+     */
+    public ServerState getServerState(short serverId)
+    {
+      return LDAPStates.get(serverId);
+    }
+
+    /**
+     * Get the highest know change number of the LDAP server with the provided
+     * serverId.
+     * @param serverId The server ID.
+     * @return The highest change number.
+     */
+    public ChangeNumber getMaxCN(short serverId)
+    {
+      return maxCNs.get(serverId);
+    }
+
+    /**
+     * Get an approximation of the date of the oldest missing changes.
+     * serverId.
+     * @param serverId The server ID.
+     * @return The approximation of the date of the oldest missing change.
+     */
+    public Long getApproxFirstMissingDate(short serverId)
+    {
+      return approxFirstMissingDate.get(serverId);
+    }
+
+    /**
+     * Get the number of missing change for the server with the provided state.
+     * @param state The provided server state.
+     * @return The number of missing changes.
+     */
+    public int getMissingChanges(ServerState state)
+    {
+      // Traverse the max Cn transmitted by each server
+      // For each server, get the highest CN know from the current server
+      // Sum the difference betwenn the max and the last
+      int missingChanges = 0;
+      Iterator<Short> itc = maxCNs.keySet().iterator();
+      while (itc.hasNext())
+      {
+        Short sid = itc.next();
+        ChangeNumber maxCN = maxCNs.get(sid);
+        ChangeNumber last = state.getMaxChangeNumber(sid);
+        if (last == null)
+        {
+          last = new ChangeNumber(0,0, sid);
+        }
+        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
+        missingChanges += missingChangesFromSID;
+      }
+      return missingChanges;
+    }
 }

--
Gitblit v1.10.0