From 0f9ee85fd0b36220ef6a3ee8d2b9f5f6f02b26bd Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 11 Feb 2008 13:42:42 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |  170 ++++++++++++++++++++++++++++----------------------------
 1 files changed, 86 insertions(+), 84 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 0800236..ad68300 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
     while (update == null)
     {
       InitializeRequestMessage initMsg = null;
-      synchronized (broker)
+      ReplicationMessage msg;
+      try
       {
-        ReplicationMessage msg;
-        try
+        msg = broker.receive();
+        if (msg == null)
         {
-          msg = broker.receive();
-          if (msg == null)
-          {
-            // The server is in the shutdown process
-            return null;
-          }
-
-          if (debugEnabled())
-            if (!(msg instanceof HeartbeatMessage))
-              TRACER.debugVerbose("Message received <" + msg + ">");
-
-          if (msg instanceof AckMessage)
-          {
-            AckMessage ack = (AckMessage) msg;
-            receiveAck(ack);
-            }
-            else if (msg instanceof InitializeRequestMessage)
-          {
-            // Another server requests us to provide entries
-            // for a total update
-              initMsg = (InitializeRequestMessage)msg;
-            }
-            else if (msg instanceof InitializeTargetMessage)
-          {
-            // Another server is exporting its entries to us
-            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
-
-            try
-            {
-              // This must be done while we are still holding the
-              // broker lock because we are now going to receive a
-              // bunch of entries from the remote server and we
-              // want the import thread to catch them and
-              // not the ListenerThread.
-              initialize(importMsg);
-              }
-              catch(DirectoryException de)
-            {
-              // Returns an error message to notify the sender
-              ErrorMessage errorMsg =
-                new ErrorMessage(importMsg.getsenderID(),
-                de.getMessageObject());
-              MessageBuilder mb = new MessageBuilder();
-              mb.append(de.getMessageObject());
-              TRACER.debugInfo(Message.toString(mb.toMessage()));
-              broker.publish(errorMsg);
-            }
-            }
-            else if (msg instanceof ErrorMessage)
-          {
-            if (ieContext != null)
-            {
-              // This is an error termination for the 2 following cases :
-              // - either during an export
-              // - or before an import really started
-              //   For example, when we publish a request and the
-              //  replicationServer did not find any import source.
-                abandonImportExport((ErrorMessage)msg);
-              }
-              else
-            {
-              /* We can receive an error message from the replication server
-               * in the following cases :
-               * - we connected with an incorrect generation id
-               */
-                ErrorMessage errorMsg = (ErrorMessage)msg;
-              logError(ERR_ERROR_MSG_RECEIVED.get(
-                errorMsg.getDetails()));
-            }
-            }
-            else if (msg instanceof UpdateMessage)
-          {
-            update = (UpdateMessage) msg;
-            receiveUpdate(update);
-          }
-          }
-          catch (SocketTimeoutException e)
-        {
-        // just retry
+          // The server is in the shutdown process
+          return null;
         }
+
+        if (debugEnabled())
+          if (!(msg instanceof HeartbeatMessage))
+            TRACER.debugVerbose("Message received <" + msg + ">");
+
+        if (msg instanceof AckMessage)
+        {
+          AckMessage ack = (AckMessage) msg;
+          receiveAck(ack);
+        }
+        else if (msg instanceof InitializeRequestMessage)
+        {
+          // Another server requests us to provide entries
+          // for a total update
+          initMsg = (InitializeRequestMessage)msg;
+        }
+        else if (msg instanceof InitializeTargetMessage)
+        {
+          // Another server is exporting its entries to us
+          InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+
+          try
+          {
+            // This must be done while we are still holding the
+            // broker lock because we are now going to receive a
+            // bunch of entries from the remote server and we
+            // want the import thread to catch them and
+            // not the ListenerThread.
+            initialize(importMsg);
+          }
+          catch(DirectoryException de)
+          {
+            // Returns an error message to notify the sender
+            ErrorMessage errorMsg =
+              new ErrorMessage(importMsg.getsenderID(),
+                  de.getMessageObject());
+            MessageBuilder mb = new MessageBuilder();
+            mb.append(de.getMessageObject());
+            TRACER.debugInfo(Message.toString(mb.toMessage()));
+            broker.publish(errorMsg);
+          }
+        }
+        else if (msg instanceof ErrorMessage)
+        {
+          if (ieContext != null)
+          {
+            // This is an error termination for the 2 following cases :
+            // - either during an export
+            // - or before an import really started
+            //   For example, when we publish a request and the
+            //  replicationServer did not find any import source.
+            abandonImportExport((ErrorMessage)msg);
+          }
+          else
+          {
+            /* We can receive an error message from the replication server
+             * in the following cases :
+             * - we connected with an incorrect generation id
+             */
+            ErrorMessage errorMsg = (ErrorMessage)msg;
+            logError(ERR_ERROR_MSG_RECEIVED.get(
+                errorMsg.getDetails()));
+          }
+        }
+        else if (msg instanceof UpdateMessage)
+        {
+          update = (UpdateMessage) msg;
+          receiveUpdate(update);
+        }
+      }
+      catch (SocketTimeoutException e)
+      {
+        // just retry
       }
       // Test if we have received and export request message and
       // if that's the case handle it now.
@@ -1259,7 +1256,10 @@
     shutdown = true;
 
     // Stop the listener thread
-    listenerThread.shutdown();
+    if (listenerThread != null)
+    {
+      listenerThread.shutdown();
+    }
 
     synchronized (this)
     {
@@ -1274,7 +1274,8 @@
     broker.stop();
 
     // Wait for the listener thread to stop
-    listenerThread.waitForShutdown();
+    if (listenerThread != null)
+      listenerThread.waitForShutdown();
 
     // wait for completion of the persistentServerState thread.
     try
@@ -1441,6 +1442,7 @@
       {
         if (!dependency)
         {
+          broker.updateWindowAfterReplay();
           if (msg.isAssured())
             ack(msg.getChangeNumber());
           incProcessedUpdates();

--
Gitblit v1.10.0