From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |   41 +++++++++++++++++++----------------------
 1 files changed, 19 insertions(+), 22 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index e92d185..2c6f204 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -70,6 +70,7 @@
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
+import org.opends.server.replication.protocol.InitializeRcvAckMsg;
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ProtocolVersion;
@@ -1591,11 +1592,11 @@
    */
   public void process(RoutableMsg msg, ServerHandler senderHandler)
   {
-
     // Test the message for which a ReplicationServer is expected
     // to be the destination
     if (!(msg instanceof InitializeRequestMsg) &&
         !(msg instanceof InitializeTargetMsg) &&
+        !(msg instanceof InitializeRcvAckMsg) &&
         !(msg instanceof EntryMsg) &&
         !(msg instanceof DoneMsg) &&
         (msg.getDestination() == this.replicationServer.getServerId()))
@@ -1616,7 +1617,7 @@
           // Monitoring information requested by a DS
           MonitorMsg monitorMsg =
             createGlobalTopologyMonitorMsg(
-                msg.getDestination(), msg.getsenderID(), false);
+                msg.getDestination(), msg.getSenderID(), false);
 
            if (monitorMsg != null)
           {
@@ -1634,7 +1635,7 @@
           // Monitoring information requested by a RS
           MonitorMsg monitorMsg =
             createLocalTopologyMonitorMsg(msg.getDestination(),
-            msg.getsenderID());
+            msg.getSenderID());
 
           if (monitorMsg != null)
           {
@@ -1668,7 +1669,7 @@
             NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName()));
         mb1.append("serverID:" + msg.getDestination());
         ErrorMsg errMsg = new ErrorMsg(
-          msg.getsenderID(), mb1.toMessage());
+          msg.getSenderID(), mb1.toMessage());
         try
         {
           senderHandler.send(errMsg);
@@ -1687,15 +1688,15 @@
     if (servers.isEmpty())
     {
       MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+          this.baseDn, Integer.toString(msg.getDestination())));
       mb.append(" In Replication Server=" +
         this.replicationServer.getMonitorInstanceName());
-      mb.append(" domain =" + this.baseDn);
-      mb.append(" unroutable message =" + msg.toString());
-      mb.append(" routing table is empty");
+      mb.append(" unroutable message =" + msg.getClass().getSimpleName());
+      mb.append(" Details:routing table is empty");
       ErrorMsg errMsg = new ErrorMsg(
         this.replicationServer.getServerId(),
-        msg.getsenderID(),
+        msg.getSenderID(),
         mb.toMessage());
       logError(mb.toMessage());
       try
@@ -1728,18 +1729,14 @@
            * to its destination server.
            * Send back an error to the originator of the message.
            */
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
-          mb.append(stackTraceToSingleLineString(ioe));
-          mb.append(" ");
-          mb.append(msg.getClass().getCanonicalName());
-          logError(mb.toMessage());
-
           MessageBuilder mb1 = new MessageBuilder();
-          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
-          mb1.append("serverID:" + msg.getDestination());
+          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+              this.baseDn, Integer.toString(msg.getDestination())));
+          mb1.append(" unroutable message =" + msg.getClass().getSimpleName());
+          mb1.append(" Details: " + ioe.getLocalizedMessage());
           ErrorMsg errMsg = new ErrorMsg(
-            msg.getsenderID(), mb1.toMessage());
+            msg.getSenderID(), mb1.toMessage());
+          logError(mb1.toMessage());
           try
           {
             senderHandler.send(errMsg);
@@ -2729,7 +2726,7 @@
           // This is a response for an earlier request whose computing is
           // already complete.
           logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
-              Integer.toString(msg.getsenderID())));
+              Integer.toString(msg.getSenderID())));
           return;
         }
         // Here is the RS state : list <serverID, lastChangeNumber>
@@ -2738,7 +2735,7 @@
         wrkMonitorData.setMaxCNs(replServerState);
 
         // store the remote RS states.
-        wrkMonitorData.setRSState(msg.getsenderID(), replServerState);
+        wrkMonitorData.setRSState(msg.getSenderID(), replServerState);
 
         // Store the remote LDAP servers states
         Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2789,7 +2786,7 @@
             TRACER.debugInfo(
               "In " + this +
               " baseDn=" + baseDn +
-              " Processed msg from " + msg.getsenderID() +
+              " Processed msg from " + msg.getSenderID() +
               " New monitor data: " + wrkMonitorData.toString());
         }
       }

--
Gitblit v1.10.0