From 2ec1e20dacc4606317fc9e38890117df638a1fff Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 11 Feb 2008 13:42:42 +0000
Subject: [PATCH] 

---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java           |    1 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                             |   36 ++++++--
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java |    8 ++
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java                                  |    5 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java                             |  170 +++++++++++++++++++++---------------------
 5 files changed, 124 insertions(+), 96 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 3774933..3ff5c08 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -56,6 +56,7 @@
   private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
   private boolean shutdown = false;
   private boolean done = false;
+  private static int count = 0;
 
   /**
    * Constructor for the ReplayThread.
@@ -64,7 +65,7 @@
    */
   public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
   {
-     super("Replication Replay thread");
+     super("Replication Replay thread " + count++);
      this.updateToReplayQueue = updateToReplayQueue;
   }
 
@@ -130,7 +131,7 @@
   {
     try
     {
-      while (done == false)
+      while ((done == false) && (this.isAlive()))
       {
         Thread.sleep(50);
       }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a21ce43..2eeedb0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -954,17 +954,9 @@
         {
           WindowMessage windowMsg = (WindowMessage) msg;
           sendWindow.release(windowMsg.getNumAck());
-        } else
+        }
+        else
         {
-          if (msg instanceof UpdateMessage)
-          {
-            rcvWindow--;
-            if (rcvWindow < halfRcvWindow)
-            {
-              session.publish(new WindowMessage(halfRcvWindow));
-              rcvWindow += halfRcvWindow;
-            }
-          }
           return msg;
         }
       } catch (SocketTimeoutException e)
@@ -988,6 +980,30 @@
   }
 
   /**
+   * This method allows to do the necessary computing for the window
+   * management after treatment by the worker threads.
+   *
+   * This should be called once the replay thread have done their job
+   * and the window can be open again.
+   */
+  public synchronized void updateWindowAfterReplay()
+  {
+    try
+    {
+      rcvWindow--;
+      if (rcvWindow < halfRcvWindow)
+      {
+        session.publish(new WindowMessage(halfRcvWindow));
+        rcvWindow += halfRcvWindow;
+      }
+    } catch (IOException e)
+    {
+      // Any error on the socket will be handled by the thread calling receive()
+      // just ignore.
+    }
+  }
+
+  /**
    * stop the server.
    */
   public void stop()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 0800236..ad68300 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
     while (update == null)
     {
       InitializeRequestMessage initMsg = null;
-      synchronized (broker)
+      ReplicationMessage msg;
+      try
       {
-        ReplicationMessage msg;
-        try
+        msg = broker.receive();
+        if (msg == null)
         {
-          msg = broker.receive();
-          if (msg == null)
-          {
-            // The server is in the shutdown process
-            return null;
-          }
-
-          if (debugEnabled())
-            if (!(msg instanceof HeartbeatMessage))
-              TRACER.debugVerbose("Message received <" + msg + ">");
-
-          if (msg instanceof AckMessage)
-          {
-            AckMessage ack = (AckMessage) msg;
-            receiveAck(ack);
-            }
-            else if (msg instanceof InitializeRequestMessage)
-          {
-            // Another server requests us to provide entries
-            // for a total update
-              initMsg = (InitializeRequestMessage)msg;
-            }
-            else if (msg instanceof InitializeTargetMessage)
-          {
-            // Another server is exporting its entries to us
-            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
-
-            try
-            {
-              // This must be done while we are still holding the
-              // broker lock because we are now going to receive a
-              // bunch of entries from the remote server and we
-              // want the import thread to catch them and
-              // not the ListenerThread.
-              initialize(importMsg);
-              }
-              catch(DirectoryException de)
-            {
-              // Returns an error message to notify the sender
-              ErrorMessage errorMsg =
-                new ErrorMessage(importMsg.getsenderID(),
-                de.getMessageObject());
-              MessageBuilder mb = new MessageBuilder();
-              mb.append(de.getMessageObject());
-              TRACER.debugInfo(Message.toString(mb.toMessage()));
-              broker.publish(errorMsg);
-            }
-            }
-            else if (msg instanceof ErrorMessage)
-          {
-            if (ieContext != null)
-            {
-              // This is an error termination for the 2 following cases :
-              // - either during an export
-              // - or before an import really started
-              //   For example, when we publish a request and the
-              //  replicationServer did not find any import source.
-                abandonImportExport((ErrorMessage)msg);
-              }
-              else
-            {
-              /* We can receive an error message from the replication server
-               * in the following cases :
-               * - we connected with an incorrect generation id
-               */
-                ErrorMessage errorMsg = (ErrorMessage)msg;
-              logError(ERR_ERROR_MSG_RECEIVED.get(
-                errorMsg.getDetails()));
-            }
-            }
-            else if (msg instanceof UpdateMessage)
-          {
-            update = (UpdateMessage) msg;
-            receiveUpdate(update);
-          }
-          }
-          catch (SocketTimeoutException e)
-        {
-        // just retry
+          // The server is in the shutdown process
+          return null;
         }
+
+        if (debugEnabled())
+          if (!(msg instanceof HeartbeatMessage))
+            TRACER.debugVerbose("Message received <" + msg + ">");
+
+        if (msg instanceof AckMessage)
+        {
+          AckMessage ack = (AckMessage) msg;
+          receiveAck(ack);
+        }
+        else if (msg instanceof InitializeRequestMessage)
+        {
+          // Another server requests us to provide entries
+          // for a total update
+          initMsg = (InitializeRequestMessage)msg;
+        }
+        else if (msg instanceof InitializeTargetMessage)
+        {
+          // Another server is exporting its entries to us
+          InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+
+          try
+          {
+            // This must be done while we are still holding the
+            // broker lock because we are now going to receive a
+            // bunch of entries from the remote server and we
+            // want the import thread to catch them and
+            // not the ListenerThread.
+            initialize(importMsg);
+          }
+          catch(DirectoryException de)
+          {
+            // Returns an error message to notify the sender
+            ErrorMessage errorMsg =
+              new ErrorMessage(importMsg.getsenderID(),
+                  de.getMessageObject());
+            MessageBuilder mb = new MessageBuilder();
+            mb.append(de.getMessageObject());
+            TRACER.debugInfo(Message.toString(mb.toMessage()));
+            broker.publish(errorMsg);
+          }
+        }
+        else if (msg instanceof ErrorMessage)
+        {
+          if (ieContext != null)
+          {
+            // This is an error termination for the 2 following cases :
+            // - either during an export
+            // - or before an import really started
+            //   For example, when we publish a request and the
+            //  replicationServer did not find any import source.
+            abandonImportExport((ErrorMessage)msg);
+          }
+          else
+          {
+            /* We can receive an error message from the replication server
+             * in the following cases :
+             * - we connected with an incorrect generation id
+             */
+            ErrorMessage errorMsg = (ErrorMessage)msg;
+            logError(ERR_ERROR_MSG_RECEIVED.get(
+                errorMsg.getDetails()));
+          }
+        }
+        else if (msg instanceof UpdateMessage)
+        {
+          update = (UpdateMessage) msg;
+          receiveUpdate(update);
+        }
+      }
+      catch (SocketTimeoutException e)
+      {
+        // just retry
       }
       // Test if we have received and export request message and
       // if that's the case handle it now.
@@ -1259,7 +1256,10 @@
     shutdown = true;
 
     // Stop the listener thread
-    listenerThread.shutdown();
+    if (listenerThread != null)
+    {
+      listenerThread.shutdown();
+    }
 
     synchronized (this)
     {
@@ -1274,7 +1274,8 @@
     broker.stop();
 
     // Wait for the listener thread to stop
-    listenerThread.waitForShutdown();
+    if (listenerThread != null)
+      listenerThread.waitForShutdown();
 
     // wait for completion of the persistentServerState thread.
     try
@@ -1441,6 +1442,7 @@
       {
         if (!dependency)
         {
+          broker.updateWindowAfterReplay();
           if (msg.isAssured())
             ack(msg.getChangeNumber());
           incProcessedUpdates();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index 2d673c2..6be17f5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -155,6 +155,7 @@
         while (true)
         {
           broker.receive();
+          broker.updateWindowAfterReplay();
           rcvCount++;
         }
       }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 8186d82..1494068 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -248,6 +248,7 @@
                       "uid");
       server1.publish(msg);
       ReplicationMessage msg2 = server2.receive();
+      server2.updateWindowAfterReplay();
       if (msg2 instanceof DeleteMsg)
       {
         DeleteMsg del = (DeleteMsg) msg2;
@@ -263,6 +264,7 @@
       msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
       server1.publish(msg);
       msg2 = server2.receive();
+      server2.updateWindowAfterReplay();
       if (msg2 instanceof DeleteMsg)
       {
         DeleteMsg del = (DeleteMsg) msg2;
@@ -280,6 +282,7 @@
                       "other-uid");
       server2.publish(msg);
       msg2 = server1.receive();
+      server1.updateWindowAfterReplay();
       if (msg2 instanceof DeleteMsg)
       {
         DeleteMsg del = (DeleteMsg) msg2;
@@ -295,6 +298,7 @@
       msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
       server2.publish(msg);
       msg2 = server1.receive();
+      server1.updateWindowAfterReplay();
       if (msg2 instanceof DeleteMsg)
       {
         DeleteMsg del = (DeleteMsg) msg2;
@@ -329,6 +333,7 @@
                              100, replicationServerPort, 1000, false);
 
       ReplicationMessage msg2 = broker.receive();
+      broker.updateWindowAfterReplay();
       if (!(msg2 instanceof DeleteMsg))
         fail("ReplicationServer basic transmission failed:" + msg2);
       else
@@ -367,6 +372,7 @@
                              100, replicationServerPort, 5000, state);
 
       ReplicationMessage msg2 = broker.receive();
+      broker.updateWindowAfterReplay();
       if (!(msg2 instanceof DeleteMsg))
       {
         fail("ReplicationServer basic transmission failed:" + msg2);
@@ -776,6 +782,7 @@
             msg2 = broker2.receive();
             if (msg2 == null)
               break;
+            broker2.updateWindowAfterReplay();
           }
           catch (Exception e)
           {
@@ -982,6 +989,7 @@
         while (true)
         {
           ReplicationMessage msg = broker.receive();
+          broker.updateWindowAfterReplay();
           if (msg == null)
             break;
           }

--
Gitblit v1.10.0