From c6c3de416bcc406346299a860905c9e71870a4ab Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 27 Aug 2007 11:58:47 +0000
Subject: [PATCH] complement for issue 2097 : total update fails sending a Message to ReplicationCache

---
 opends/src/server/org/opends/server/replication/server/ReplicationCache.java                |   28 +++++++++++--
 opends/src/server/org/opends/server/tasks/InitializeTask.java                               |   11 ++++-
 opends/src/messages/src/org/opends/messages/MessageDescriptor.java                          |    5 ++
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java               |   50 +++++++++++++++++--------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java |   11 ++---
 5 files changed, 74 insertions(+), 31 deletions(-)

diff --git a/opends/src/messages/src/org/opends/messages/MessageDescriptor.java b/opends/src/messages/src/org/opends/messages/MessageDescriptor.java
index aa6365f..9e39e19 100644
--- a/opends/src/messages/src/org/opends/messages/MessageDescriptor.java
+++ b/opends/src/messages/src/org/opends/messages/MessageDescriptor.java
@@ -963,7 +963,10 @@
    * @return int ordinal value
    */
   public int getOrdinal() {
-    return this.ordinal;
+    if (this.ordinal == null)
+      return 0;
+    else
+      return this.ordinal;
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 5ed95cb..05233d3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -799,9 +799,10 @@
 
     if (update == null)
     {
-      synchronized (broker)
+      while (update == null)
       {
-        while (update == null)
+        InitializeRequestMessage initMsg = null;
+        synchronized (broker)
         {
           ReplicationMessage msg;
           try
@@ -822,32 +823,27 @@
             {
               // Another server requests us to provide entries
               // for a total update
-              InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
-              try
-              {
-                initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
-                                 null);
-              }
-              catch(DirectoryException de)
-              {
-                // An error message has been sent to the peer
-                // Nothing more to do locally
-              }
+              initMsg = (InitializeRequestMessage) msg;
             }
             else if (msg instanceof InitializeTargetMessage)
             {
               // Another server is exporting its entries to us
-              InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+              InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
 
               try
               {
-                importBackend(initMsg);
+                // This must be done while we are still holding the
+                // broker lock because we are now going to receive a
+                // bunch of entries from the remote server and we
+                // want the import thread to catch them and
+                // not the ListenerThread.
+                importBackend(importMsg);
               }
               catch(DirectoryException de)
               {
                 // Return an error message to notify the sender
                 ErrorMessage errorMsg =
-                  new ErrorMessage(initMsg.getsenderID(),
+                  new ErrorMessage(importMsg.getsenderID(),
                                    de.getMessageObject());
                 MessageBuilder mb = new MessageBuilder();
                 mb.append(de.getMessageObject());
@@ -880,6 +876,28 @@
             // just retry
           }
         }
+        // Test if we have received and export request message and
+        // if that's the case handle it now.
+        // This must be done outside of the portion of code protected
+        // by the broker lock so that we keep receiveing update
+        // when we are doing and export and so that a possible
+        // closure of the socket happening when we are publishing the
+        // entries to the remote can be handled by the other
+        // ListenerThread when they call this method and therefore the
+        // broker.receive() method.
+        if (initMsg != null)
+        {
+          try
+          {
+            initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+                null);
+          }
+          catch(DirectoryException de)
+          {
+            // An error message has been sent to the peer
+            // Nothing more to do locally
+          }
+        }
       }
     }
     return update;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index ef31ab3..d618cf8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -583,8 +583,8 @@
       {
         // TODO Handle error properly (sender timeout in addition)
         /*
-         * An error happened trying the send back an ack to this server.
-         * Log an error and close the connection to this server.
+         * An error happened trying the send back an error to this server.
+         * Log an error and close the connection to the sender server.
          */
         MessageBuilder mb2 = new MessageBuilder();
         mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -604,8 +604,9 @@
         catch(IOException ioe)
         {
           /*
-           * An error happened trying the send back an ack to this server.
-           * Log an error and close the connection to this server.
+           * An error happened trying the send a routabled message
+           * to its destination server.
+           * Send back an error to the originator of the message.
            */
           MessageBuilder mb = new MessageBuilder();
           mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
@@ -613,7 +614,24 @@
           mb.append(" ");
           mb.append(msg.getClass().getCanonicalName());
           logError(mb.toMessage());
-          senderHandler.shutdown();
+
+          MessageBuilder mb1 = new MessageBuilder();
+          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+          mb1.append("serverID:" + msg.getDestination());
+          ErrorMessage errMsg = new ErrorMessage(
+              msg.getsenderID(), mb1.toMessage());
+          try
+          {
+            senderHandler.send(errMsg);
+          }
+          catch(IOException ioe1)
+          {
+            // an error happened on the sender session trying to recover
+            // from an error on the receiver session.
+            // We don't have much solution left beside closing the sessions.
+            senderHandler.shutdown();
+            targetHandler.shutdown();
+          }
           // TODO Handle error properly (sender timeout in addition)
         }
       }
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTask.java b/opends/src/server/org/opends/server/tasks/InitializeTask.java
index 0524545..05afb59 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -25,6 +25,9 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.tasks;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+
 import static org.opends.server.config.ConfigConstants.*;
 import static org.opends.server.core.DirectoryServer.getAttributeType;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -32,7 +35,6 @@
 
 import java.util.List;
 
-import org.opends.messages.MessageBuilder;
 import org.opends.messages.TaskMessages;
 import org.opends.server.backends.task.Task;
 import org.opends.server.backends.task.TaskState;
@@ -73,6 +75,8 @@
   // completed
   long left = 0;
 
+  private Message initTaskError = null;
+
   /**
    * {@inheritDoc}
    */
@@ -160,6 +164,9 @@
       initState = TaskState.STOPPED_BY_ERROR;
     }
 
+    if (initTaskError != null)
+      logError(initTaskError);
+
     if (debugEnabled())
     {
       TRACER.debugInfo("InitializeTask is ending with state:%s",
@@ -181,7 +188,7 @@
     {
       if (de != null)
       {
-        logError(de.getMessageObject());
+        initTaskError = de.getMessageObject();
       }
       if (debugEnabled())
       {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index 0236f64..a119ab3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -111,7 +111,6 @@
   private static final DebugTracer TRACER = getTracer();
 
   private static final int WINDOW_SIZE = 10;
-  private static final int CHANGELOG_QUEUE_SIZE = 100;
 
   /**
    * A "person" entry
@@ -501,15 +500,13 @@
           // Check that the left counter.
           AttributeType taskStateType =
             DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
-          String leftString =
-            resultEntry.getAttributeValue(taskStateType,
+          resultEntry.getAttributeValue(taskStateType,
                 DirectoryStringSyntax.DECODER);
 
           // Check that the total counter.
           taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
-          String totalString =
-           resultEntry.getAttributeValue(taskStateType,
+          resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
         }
         catch(Exception e)
@@ -544,7 +541,7 @@
             log(logMessages.get(0));
             log(expectedMessage.toString());
             assertTrue(logMessages.get(0).indexOf(
-                expectedMessage.toString())>0);
+                expectedMessage.toString())>=0);
           }
         }
       }
@@ -1387,7 +1384,7 @@
     String testCase = "InitializeNoSource";
     log("Starting "+testCase);
 
-    // Start SS
+    // Start Replication Server
     changelog1 = createChangelogServer(changelog1ID);
 
     // Creates config to synchronize suffix

--
Gitblit v1.10.0