From 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 12 Jul 2007 15:41:32 +0000
Subject: [PATCH] Fix for 1895 Summary: Total update does not work with 3 servers that are also replication servers

---
 opends/src/server/org/opends/server/replication/server/ReplicationCache.java |  166 +++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 130 insertions(+), 36 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 2fbc4da..24a167f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.AckMessage;
 import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.InitializeRequestMessage;
 import org.opends.server.replication.protocol.RoutableMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
 import org.opends.server.types.DN;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
    * We add new TreeSet in the HashMap when a new replication server register
    * to this replication server.
    */
+
   private Map<Short, ServerHandler> replicationServers =
     new ConcurrentHashMap<Short, ServerHandler>();
 
@@ -253,6 +254,11 @@
         return false;
       }
       connectedServers.put(handler.getServerId(), handler);
+
+      // Update the remote replication servers with our list
+      // of connected LDAP servers
+      sendReplServerInfo();
+
       return true;
     }
   }
@@ -269,7 +275,13 @@
     if (handler.isReplicationServer())
       replicationServers.remove(handler.getServerId());
     else
+    {
       connectedServers.remove(handler.getServerId());
+
+      // Update the remote replication servers with our list
+      // of connected LDAP servers
+      sendReplServerInfo();
+    }
   }
 
   /**
@@ -312,6 +324,12 @@
         return false;
       }
       replicationServers.put(handler.getServerId(), handler);
+
+      // Update this server with the list of LDAP servers
+      // already connected
+      handler.sendInfo(
+          new ReplServerInfoMessage(getConnectedLDAPservers()));
+
       return true;
     }
   }
@@ -376,6 +394,22 @@
     return sourceDbHandlers.keySet();
   }
 
+  /**
+   * Returns as a set of String the list of LDAP servers connected to us.
+   * Each string is the serverID of a connected LDAP server.
+   *
+   * @return The set of connected LDAP servers
+   */
+  public List<String> getConnectedLDAPservers()
+  {
+    List<String> mySet = new ArrayList<String>(0);
+
+    for (ServerHandler handler : connectedServers.values())
+    {
+      mySet.add(String.valueOf(handler.getServerId()));
+    }
+    return mySet;
+  }
 
   /**
    * Creates and returns an iterator.
@@ -473,15 +507,9 @@
   protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
       ServerHandler senderHandler)
   {
-
     List<ServerHandler> servers =
       new ArrayList<ServerHandler>();
 
-    logError(ErrorLogCategory.SYNCHRONIZATION,
-        ErrorLogSeverity.SEVERE_ERROR,
-        "getDestinationServers"
-        + " msgDest:" + msg.getDestination() , 1);
-
     if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
     {
       // TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
         }
       }
 
-      // Send to all connected LDAP servers
+      // Sends to all connected LDAP servers
       for (ServerHandler destinationHandler : connectedServers.values())
       {
         // Don't loop on the sender
@@ -518,14 +546,20 @@
       else
       {
         // the targeted server is NOT connected
+        // Let's search for THE changelog server that MAY
+        // have the targeted server connected.
         if (senderHandler.isLDAPserver())
         {
-          // let's forward to the other changelogs
-          servers.addAll(replicationServers.values());
+          for (ServerHandler h : replicationServers.values())
+          {
+            if (h.isRemoteLDAPServer(msg.getDestination()))
+            {
+              servers.add(h);
+            }
+          }
         }
       }
     }
-
     return servers;
   }
 
@@ -543,37 +577,53 @@
 
     if (servers.isEmpty())
     {
-      if (!(msg instanceof InitializeRequestMessage))
-      {
-        // TODO A more elaborated policy is probably needed
-      }
-      else
-      {
-        ErrorMessage errMsg = new ErrorMessage(
-            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
-            "serverID:" + msg.getDestination());
-
-        try
-        {
-          senderHandler.send(errMsg);
-        }
-        catch(IOException ioe)
-        {
-          // TODO Handle error properly (sender timeout in addition)
-        }
-      }
-      return;
-    }
-
-    for (ServerHandler targetHandler : servers)
-    {
+      ErrorMessage errMsg = new ErrorMessage(
+          msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+          "serverID:" + msg.getDestination());
       try
       {
-        targetHandler.send(msg);
+        senderHandler.send(errMsg);
       }
       catch(IOException ioe)
       {
         // TODO Handle error properly (sender timeout in addition)
+        /*
+         * An error happened trying the send back an ack to this server.
+         * Log an error and close the connection to this server.
+         */
+        int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
+        String message = getMessage(msgID, this.toString())
+        + stackTraceToSingleLineString(ioe);
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR,
+            message, msgID);
+        senderHandler.shutdown();
+      }
+    }
+    else
+    {
+      for (ServerHandler targetHandler : servers)
+      {
+        try
+        {
+          targetHandler.send(msg);
+        }
+        catch(IOException ioe)
+        {
+          /*
+           * An error happened trying the send back an ack to this server.
+           * Log an error and close the connection to this server.
+           */
+          int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
+          String message = getMessage(msgID, this.toString())
+          + stackTraceToSingleLineString(ioe) + " "
+          + msg.getClass().getCanonicalName();
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.SEVERE_ERROR,
+              message, msgID);
+          senderHandler.shutdown();
+          // TODO Handle error properly (sender timeout in addition)
+        }
       }
     }
 
@@ -722,4 +772,48 @@
       }
       return true;
     }
+
+    /**
+     * Send a ReplServerInfoMessage to all the connected replication servers
+     * in order to let them know our connected LDAP servers.
+     */
+    private void sendReplServerInfo()
+    {
+      ReplServerInfoMessage info =
+        new ReplServerInfoMessage(getConnectedLDAPservers());
+      for (ServerHandler handler : replicationServers.values())
+      {
+        try
+        {
+          handler.sendInfo(info);
+        }
+        catch (IOException e)
+        {
+          /*
+           * An error happened trying the send back an ack to this server.
+           * Log an error and close the connection to this server.
+           */
+          int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_INFO;
+          String message = getMessage(msgID, this.toString())
+          + stackTraceToSingleLineString(e);
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.SEVERE_ERROR,
+              message, msgID);
+          handler.shutdown();
+        }
+      }
+    }
+
+    /**
+     * Sets the replication server informations for the provided
+     * handler from the provided ReplServerInfoMessage.
+     *
+     * @param handler The server handler from which the info was received.
+     * @param infoMsg The information message that was received.
+     */
+    public void setReplServerInfo(
+        ServerHandler handler, ReplServerInfoMessage infoMsg)
+    {
+      handler.setReplServerInfo(infoMsg);
+    }
 }

--
Gitblit v1.10.0