From a00e725a364e9eb859ca8ed2c914e6637c94cb9d Mon Sep 17 00:00:00 2001
From: neil_a_wilson <neil_a_wilson@localhost>
Date: Thu, 29 Mar 2007 21:12:30 +0000
Subject: [PATCH] Back out the commit included in revision 1539 because it does not build properly (it appears to reference classes that are not in the repository).

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java |  374 ++++++++++++++++++-----------------------------------
 1 files changed, 127 insertions(+), 247 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index 51f099e..d7e0e2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,31 +26,26 @@
  */
 package org.opends.server.synchronization.changelog;
 
-import static org.opends.server.loggers.Error.logError;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.loggers.Error.logError;
+
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.InitializeRequestMessage;
-import org.opends.server.synchronization.protocol.ErrorMessage;
-import org.opends.server.synchronization.protocol.RoutableMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
+import com.sleepycat.je.DatabaseException;
 import org.opends.server.types.DN;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
 
-import com.sleepycat.je.DatabaseException;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -430,263 +425,148 @@
     }
   }
 
+
   /**
-   * Retrieves the destination handlers for a routable message.
+   * Send back an ack to the server that sent the change.
    *
-   * @param msg The message to route.
-   * @param senderHandler The handler of the server that published this message.
-   * @return The list of destination handlers.
+   * @param changeNumber The ChangeNumber of the change that must be acked.
+   * @param isLDAPserver This boolean indicates if the server that sent the
+   *                     change was an LDAP server or a Changelog server.
    */
-  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
-      ServerHandler senderHandler)
+  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
   {
+    short serverId = changeNumber.getServerId();
+    sendAck(changeNumber, isLDAPserver, serverId);
+  }
 
-    List<ServerHandler> servers =
-      new ArrayList<ServerHandler>();
-
-    logError(ErrorLogCategory.SYNCHRONIZATION,
-        ErrorLogSeverity.SEVERE_ERROR,
-        "getDestinationServers"
-        + " msgDest:" + msg.getDestination() , 1);
-
-    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
-    {
-      // TODO Import from the "closest server" to be implemented
-    }
-    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
-    {
-      if (!senderHandler.isChangelogServer())
-      {
-        // Send to all changelogServers
-        for (ServerHandler destinationHandler : changelogServers.values())
-        {
-          servers.add(destinationHandler);
-        }
-      }
-
-      // Send to all connected LDAP servers
-      for (ServerHandler destinationHandler : connectedServers.values())
-      {
-        // Don't loop on the sender
-        if (destinationHandler == senderHandler)
-          continue;
-        servers.add(destinationHandler);
-      }
-    }
+  /**
+   *
+   * Send back an ack to a server that sent the change.
+   *
+   * @param changeNumber The ChangeNumber of the change that must be acked.
+   * @param isLDAPserver This boolean indicates if the server that sent the
+   *                     change was an LDAP server or a Changelog server.
+   * @param serverId     The identifier of the server from which we
+   *                     received the change..
+   */
+  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+                      short serverId)
+  {
+    ServerHandler handler;
+    if (isLDAPserver)
+      handler = connectedServers.get(serverId);
     else
-    {
-      // Destination is one server
-      ServerHandler destinationHandler =
-        connectedServers.get(msg.getDestination());
-      if (destinationHandler != null)
-      {
-        servers.add(destinationHandler);
-      }
-      else
-      {
-        // the targeted server is NOT connected
-        if (senderHandler.isLDAPserver())
-        {
-          // let's forward to the other changelogs
-          servers.addAll(changelogServers.values());
-        }
-      }
-    }
+      handler = changelogServers.get(serverId);
 
-    return servers;
+    // TODO : check for null handler and log error
+    try
+    {
+      handler.sendAck(changeNumber);
+    } catch (IOException e)
+    {
+      /*
+       * An error happened trying the send back an ack to this server.
+       * Log an error and close the connection to this server.
+       */
+      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+      String message = getMessage(msgID, this.toString())
+                                  + stackTraceToSingleLineString(e);
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+               ErrorLogSeverity.SEVERE_ERROR,
+               message, msgID);
+      handler.shutdown();
+    }
   }
 
   /**
-   * Process an InitializeRequestMessage.
-   *
-   * @param msg The message received and to be processed.
-   * @param senderHandler The server handler of the server that emitted
-   * the message.
+   * Shutdown this ChangelogCache.
    */
-  public void process(RoutableMessage msg, ServerHandler senderHandler)
+  public void shutdown()
   {
-
-    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
-
-    if (servers.isEmpty())
+    // Close session with other changelogs
+    for (ServerHandler serverHandler : changelogServers.values())
     {
-      if (!(msg instanceof InitializeRequestMessage))
-      {
-        // TODO A more elaborated policy is probably needed
-      }
-      else
-      {
-        ErrorMessage errMsg = new ErrorMessage(
-            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
-            "serverID:" + msg.getDestination());
-
-        try
-        {
-          senderHandler.send(errMsg);
-        }
-        catch(IOException ioe)
-        {
-          // TODO Handle error properly (sender timeout in addition)
-        }
-      }
-      return;
+      serverHandler.shutdown();
     }
 
-    for (ServerHandler targetHandler : servers)
+    // Close session with other LDAP servers
+    for (ServerHandler serverHandler : connectedServers.values())
     {
-      try
-      {
-        targetHandler.send(msg);
-      }
-      catch(IOException ioe)
-      {
-        // TODO Handle error properly (sender timeout in addition)
-      }
+      serverHandler.shutdown();
     }
 
+    // Shutdown the dbHandlers
+    synchronized (sourceDbHandlers)
+    {
+      for (DbHandler dbHandler : sourceDbHandlers.values())
+      {
+        dbHandler.shutdown();
+      }
+      sourceDbHandlers.clear();
+    }
   }
 
-    /**
-     * Send back an ack to the server that sent the change.
-     *
-     * @param changeNumber The ChangeNumber of the change that must be acked.
-     * @param isLDAPserver This boolean indicates if the server that sent the
-     *                     change was an LDAP server or a Changelog server.
-     */
-    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+  /**
+   * Returns the ServerState describing the last change from this replica.
+   *
+   * @return The ServerState describing the last change from this replica.
+   */
+  public ServerState getDbServerState()
+  {
+    ServerState serverState = new ServerState();
+    for (DbHandler db : sourceDbHandlers.values())
     {
-      short serverId = changeNumber.getServerId();
-      sendAck(changeNumber, isLDAPserver, serverId);
+      serverState.update(db.getLastChange());
+    }
+    return serverState;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString()
+  {
+    return "ChangelogCache " + baseDn;
+  }
+
+  /**
+   * Check if some server Handler should be removed from flow control state.
+   * @throws IOException If an error happened.
+   */
+  public void checkAllSaturation() throws IOException
+  {
+    for (ServerHandler handler : changelogServers.values())
+    {
+      handler.checkWindow();
     }
 
-    /**
-     *
-     * Send back an ack to a server that sent the change.
-     *
-     * @param changeNumber The ChangeNumber of the change that must be acked.
-     * @param isLDAPserver This boolean indicates if the server that sent the
-     *                     change was an LDAP server or a Changelog server.
-     * @param serverId     The identifier of the server from which we
-     *                     received the change..
-     */
-    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
-        short serverId)
+    for (ServerHandler handler : connectedServers.values())
     {
-      ServerHandler handler;
-      if (isLDAPserver)
-        handler = connectedServers.get(serverId);
-      else
-        handler = changelogServers.get(serverId);
-
-      // TODO : check for null handler and log error
-      try
-      {
-        handler.sendAck(changeNumber);
-      } catch (IOException e)
-      {
-        /*
-         * An error happened trying the send back an ack to this server.
-         * Log an error and close the connection to this server.
-         */
-        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
-        String message = getMessage(msgID, this.toString())
-        + stackTraceToSingleLineString(e);
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.SEVERE_ERROR,
-            message, msgID);
-        handler.shutdown();
-      }
+      handler.checkWindow();
     }
+  }
 
-    /**
-     * Shutdown this ChangelogCache.
-     */
-    public void shutdown()
+  /**
+   * Check if a server that was in flow control can now restart
+   * sending updates.
+   * @param sourceHandler The server that must be checked.
+   * @return true if the server can restart sending changes.
+   *         false if the server can't restart sending changes.
+   */
+  public boolean restartAfterSaturation(ServerHandler sourceHandler)
+  {
+    for (ServerHandler handler : changelogServers.values())
     {
-      // Close session with other changelogs
-      for (ServerHandler serverHandler : changelogServers.values())
-      {
-        serverHandler.shutdown();
-      }
-
-      // Close session with other LDAP servers
-      for (ServerHandler serverHandler : connectedServers.values())
-      {
-        serverHandler.shutdown();
-      }
-
-      // Shutdown the dbHandlers
-      synchronized (sourceDbHandlers)
-      {
-        for (DbHandler dbHandler : sourceDbHandlers.values())
-        {
-          dbHandler.shutdown();
-        }
-        sourceDbHandlers.clear();
-      }
-    }
-
-    /**
-     * Returns the ServerState describing the last change from this replica.
-     *
-     * @return The ServerState describing the last change from this replica.
-     */
-    public ServerState getDbServerState()
-    {
-      ServerState serverState = new ServerState();
-      for (DbHandler db : sourceDbHandlers.values())
-      {
-        serverState.update(db.getLastChange());
-      }
-      return serverState;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String toString()
-    {
-      return "ChangelogCache " + baseDn;
-    }
-
-    /**
-     * Check if some server Handler should be removed from flow control state.
-     * @throws IOException If an error happened.
-     */
-    public void checkAllSaturation() throws IOException
-    {
-      for (ServerHandler handler : changelogServers.values())
-      {
-        handler.checkWindow();
-      }
-
-      for (ServerHandler handler : connectedServers.values())
-      {
-        handler.checkWindow();
-      }
-    }
-
-    /**
-     * Check if a server that was in flow control can now restart
-     * sending updates.
-     * @param sourceHandler The server that must be checked.
-     * @return true if the server can restart sending changes.
-     *         false if the server can't restart sending changes.
-     */
-    public boolean restartAfterSaturation(ServerHandler sourceHandler)
-    {
-      for (ServerHandler handler : changelogServers.values())
-      {
-        if (!handler.restartAfterSaturation(sourceHandler))
+      if (!handler.restartAfterSaturation(sourceHandler))
           return false;
-      }
-
-      for (ServerHandler handler : connectedServers.values())
-      {
-        if (!handler.restartAfterSaturation(sourceHandler))
-          return false;
-      }
-      return true;
     }
+
+    for (ServerHandler handler : connectedServers.values())
+    {
+      if (!handler.restartAfterSaturation(sourceHandler))
+          return false;
+    }
+    return true;
+  }
 }

--
Gitblit v1.10.0