From 40cef7d36084fbe86d34cfa497628d8972c4c9e7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 29 Mar 2007 17:53:41 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java |  366 ++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 243 insertions(+), 123 deletions(-)

diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index d7e0e2a..51f099e 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
  */
 package org.opends.server.synchronization.changelog;
 
+import static org.opends.server.loggers.Error.logError;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.loggers.Error.logError;
-
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
-import com.sleepycat.je.DatabaseException;
-import org.opends.server.types.DN;
-import org.opends.server.types.ErrorLogCategory;
-import org.opends.server.types.ErrorLogSeverity;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
+import com.sleepycat.je.DatabaseException;
+
 /**
  * This class define an in-memory cache that will be used to store
  * the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
     }
   }
 
-
   /**
-   * Send back an ack to the server that sent the change.
+   * Retrieves the destination handlers for a routable message.
    *
-   * @param changeNumber The ChangeNumber of the change that must be acked.
-   * @param isLDAPserver This boolean indicates if the server that sent the
-   *                     change was an LDAP server or a Changelog server.
+   * @param msg The message to route.
+   * @param senderHandler The handler of the server that published this message.
+   * @return The list of destination handlers.
    */
-  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
+      ServerHandler senderHandler)
   {
-    short serverId = changeNumber.getServerId();
-    sendAck(changeNumber, isLDAPserver, serverId);
-  }
 
-  /**
-   *
-   * Send back an ack to a server that sent the change.
-   *
-   * @param changeNumber The ChangeNumber of the change that must be acked.
-   * @param isLDAPserver This boolean indicates if the server that sent the
-   *                     change was an LDAP server or a Changelog server.
-   * @param serverId     The identifier of the server from which we
-   *                     received the change..
-   */
-  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
-                      short serverId)
-  {
-    ServerHandler handler;
-    if (isLDAPserver)
-      handler = connectedServers.get(serverId);
-    else
-      handler = changelogServers.get(serverId);
+    List<ServerHandler> servers =
+      new ArrayList<ServerHandler>();
 
-    // TODO : check for null handler and log error
-    try
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.SEVERE_ERROR,
+        "getDestinationServers"
+        + " msgDest:" + msg.getDestination() , 1);
+
+    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
     {
-      handler.sendAck(changeNumber);
-    } catch (IOException e)
-    {
-      /*
-       * An error happened trying the send back an ack to this server.
-       * Log an error and close the connection to this server.
-       */
-      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
-      String message = getMessage(msgID, this.toString())
-                                  + stackTraceToSingleLineString(e);
-      logError(ErrorLogCategory.SYNCHRONIZATION,
-               ErrorLogSeverity.SEVERE_ERROR,
-               message, msgID);
-      handler.shutdown();
+      // TODO Import from the "closest server" to be implemented
     }
-  }
-
-  /**
-   * Shutdown this ChangelogCache.
-   */
-  public void shutdown()
-  {
-    // Close session with other changelogs
-    for (ServerHandler serverHandler : changelogServers.values())
+    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
     {
-      serverHandler.shutdown();
-    }
-
-    // Close session with other LDAP servers
-    for (ServerHandler serverHandler : connectedServers.values())
-    {
-      serverHandler.shutdown();
-    }
-
-    // Shutdown the dbHandlers
-    synchronized (sourceDbHandlers)
-    {
-      for (DbHandler dbHandler : sourceDbHandlers.values())
+      if (!senderHandler.isChangelogServer())
       {
-        dbHandler.shutdown();
+        // Send to all changelogServers
+        for (ServerHandler destinationHandler : changelogServers.values())
+        {
+          servers.add(destinationHandler);
+        }
       }
-      sourceDbHandlers.clear();
+
+      // Send to all connected LDAP servers
+      for (ServerHandler destinationHandler : connectedServers.values())
+      {
+        // Don't loop on the sender
+        if (destinationHandler == senderHandler)
+          continue;
+        servers.add(destinationHandler);
+      }
     }
+    else
+    {
+      // Destination is one server
+      ServerHandler destinationHandler =
+        connectedServers.get(msg.getDestination());
+      if (destinationHandler != null)
+      {
+        servers.add(destinationHandler);
+      }
+      else
+      {
+        // the targeted server is NOT connected
+        if (senderHandler.isLDAPserver())
+        {
+          // let's forward to the other changelogs
+          servers.addAll(changelogServers.values());
+        }
+      }
+    }
+
+    return servers;
   }
 
   /**
-   * Returns the ServerState describing the last change from this replica.
+   * Process an InitializeRequestMessage.
    *
-   * @return The ServerState describing the last change from this replica.
+   * @param msg The message received and to be processed.
+   * @param senderHandler The server handler of the server that emitted
+   * the message.
    */
-  public ServerState getDbServerState()
+  public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
-    ServerState serverState = new ServerState();
-    for (DbHandler db : sourceDbHandlers.values())
-    {
-      serverState.update(db.getLastChange());
-    }
-    return serverState;
-  }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String toString()
-  {
-    return "ChangelogCache " + baseDn;
-  }
+    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
 
-  /**
-   * Check if some server Handler should be removed from flow control state.
-   * @throws IOException If an error happened.
-   */
-  public void checkAllSaturation() throws IOException
-  {
-    for (ServerHandler handler : changelogServers.values())
+    if (servers.isEmpty())
     {
-      handler.checkWindow();
+      if (!(msg instanceof InitializeRequestMessage))
+      {
+        // TODO A more elaborated policy is probably needed
+      }
+      else
+      {
+        ErrorMessage errMsg = new ErrorMessage(
+            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+            "serverID:" + msg.getDestination());
+
+        try
+        {
+          senderHandler.send(errMsg);
+        }
+        catch(IOException ioe)
+        {
+          // TODO Handle error properly (sender timeout in addition)
+        }
+      }
+      return;
     }
 
-    for (ServerHandler handler : connectedServers.values())
+    for (ServerHandler targetHandler : servers)
     {
-      handler.checkWindow();
+      try
+      {
+        targetHandler.send(msg);
+      }
+      catch(IOException ioe)
+      {
+        // TODO Handle error properly (sender timeout in addition)
+      }
     }
+
   }
 
-  /**
-   * Check if a server that was in flow control can now restart
-   * sending updates.
-   * @param sourceHandler The server that must be checked.
-   * @return true if the server can restart sending changes.
-   *         false if the server can't restart sending changes.
-   */
-  public boolean restartAfterSaturation(ServerHandler sourceHandler)
-  {
-    for (ServerHandler handler : changelogServers.values())
+    /**
+     * Send back an ack to the server that sent the change.
+     *
+     * @param changeNumber The ChangeNumber of the change that must be acked.
+     * @param isLDAPserver This boolean indicates if the server that sent the
+     *                     change was an LDAP server or a Changelog server.
+     */
+    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
     {
-      if (!handler.restartAfterSaturation(sourceHandler))
+      short serverId = changeNumber.getServerId();
+      sendAck(changeNumber, isLDAPserver, serverId);
+    }
+
+    /**
+     *
+     * Send back an ack to a server that sent the change.
+     *
+     * @param changeNumber The ChangeNumber of the change that must be acked.
+     * @param isLDAPserver This boolean indicates if the server that sent the
+     *                     change was an LDAP server or a Changelog server.
+     * @param serverId     The identifier of the server from which we
+     *                     received the change..
+     */
+    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+        short serverId)
+    {
+      ServerHandler handler;
+      if (isLDAPserver)
+        handler = connectedServers.get(serverId);
+      else
+        handler = changelogServers.get(serverId);
+
+      // TODO : check for null handler and log error
+      try
+      {
+        handler.sendAck(changeNumber);
+      } catch (IOException e)
+      {
+        /*
+         * An error happened trying the send back an ack to this server.
+         * Log an error and close the connection to this server.
+         */
+        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+        String message = getMessage(msgID, this.toString())
+        + stackTraceToSingleLineString(e);
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR,
+            message, msgID);
+        handler.shutdown();
+      }
+    }
+
+    /**
+     * Shutdown this ChangelogCache.
+     */
+    public void shutdown()
+    {
+      // Close session with other changelogs
+      for (ServerHandler serverHandler : changelogServers.values())
+      {
+        serverHandler.shutdown();
+      }
+
+      // Close session with other LDAP servers
+      for (ServerHandler serverHandler : connectedServers.values())
+      {
+        serverHandler.shutdown();
+      }
+
+      // Shutdown the dbHandlers
+      synchronized (sourceDbHandlers)
+      {
+        for (DbHandler dbHandler : sourceDbHandlers.values())
+        {
+          dbHandler.shutdown();
+        }
+        sourceDbHandlers.clear();
+      }
+    }
+
+    /**
+     * Returns the ServerState describing the last change from this replica.
+     *
+     * @return The ServerState describing the last change from this replica.
+     */
+    public ServerState getDbServerState()
+    {
+      ServerState serverState = new ServerState();
+      for (DbHandler db : sourceDbHandlers.values())
+      {
+        serverState.update(db.getLastChange());
+      }
+      return serverState;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString()
+    {
+      return "ChangelogCache " + baseDn;
+    }
+
+    /**
+     * Check if some server Handler should be removed from flow control state.
+     * @throws IOException If an error happened.
+     */
+    public void checkAllSaturation() throws IOException
+    {
+      for (ServerHandler handler : changelogServers.values())
+      {
+        handler.checkWindow();
+      }
+
+      for (ServerHandler handler : connectedServers.values())
+      {
+        handler.checkWindow();
+      }
+    }
+
+    /**
+     * Check if a server that was in flow control can now restart
+     * sending updates.
+     * @param sourceHandler The server that must be checked.
+     * @return true if the server can restart sending changes.
+     *         false if the server can't restart sending changes.
+     */
+    public boolean restartAfterSaturation(ServerHandler sourceHandler)
+    {
+      for (ServerHandler handler : changelogServers.values())
+      {
+        if (!handler.restartAfterSaturation(sourceHandler))
           return false;
-    }
+      }
 
-    for (ServerHandler handler : connectedServers.values())
-    {
-      if (!handler.restartAfterSaturation(sourceHandler))
+      for (ServerHandler handler : connectedServers.values())
+      {
+        if (!handler.restartAfterSaturation(sourceHandler))
           return false;
+      }
+      return true;
     }
-    return true;
-  }
 }

--
Gitblit v1.10.0