From eeb2e4f0a3ddfdf3ae9ab8fe3be4f824566f0e8d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 23 Dec 2013 15:45:05 +0000
Subject: [PATCH] Fix feature envy between ListenerThread and ReplicationDomain

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |  114 +++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 75 insertions(+), 39 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ae0a284..b01feaa 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -34,7 +34,6 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opends.messages.Category;
@@ -79,7 +78,7 @@
  *   and which can start receiving updates.
  * <p>
  *   When updates are received the Replication Service calls the
- *   {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
+ *   {@link #processUpdate(UpdateMsg)} method.
  *   ReplicationDomain implementation should implement the appropriate code
  *   for replaying the update on the local repository.
  *   When fully done the subclass must call the
@@ -156,7 +155,7 @@
    * them to the global incoming update message queue for later processing by
    * replay threads.
    */
-  private ListenerThread listenerThread;
+  private volatile DirectoryThread listenerThread = null;
 
   /**
    * A Map used to store all the ReplicationDomains created on this server.
@@ -740,7 +739,7 @@
    * Also responsible for updating the list of pending changes
    * @return the received message - null if none
    */
-  UpdateMsg receive()
+  private UpdateMsg receive()
   {
     UpdateMsg update = null;
 
@@ -2715,25 +2714,6 @@
   }
 
   /**
-   * This method is called when the ReplicationDomain has completed the
-   * processing of a received update synchronously.
-   * In such cases the processUpdateDone () is called and the state
-   * is updated automatically.
-   *
-   * @param msg The UpdateMessage that was processed.
-   */
-  void processUpdateDoneSynchronous(UpdateMsg msg)
-  {
-    /*
-    Warning: in synchronous mode, no way to tell the replay of an update went
-    wrong Just put null in processUpdateDone so that if assured replication
-    is used the ack is sent without error at replay flag.
-    */
-    processUpdateDone(msg, null);
-    state.update(msg.getCSN());
-  }
-
-  /**
    * Check if the domain is connected to a ReplicationServer.
    *
    * @return true if the server is connected, false if not.
@@ -3000,7 +2980,7 @@
    * Starts the receiver side of the Replication Service.
    * <p>
    * After this method has been called, the Replication Service will start
-   * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
+   * calling the {@link #processUpdate(UpdateMsg)}.
    * <p>
    * This method must be called once and must be called after the
    * {@link #startPublishService(ReplicationDomainCfg)}.
@@ -3009,8 +2989,48 @@
   {
     synchronized (sessionLock)
     {
-      // Create the listener thread
-      listenerThread = new ListenerThread(this);
+      final String threadName = "Replica DS(" + getServerId()
+          + ") listener for domain \"" + getBaseDNString() + "\"";
+
+      listenerThread = new DirectoryThread(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("Replication Listener thread starting.");
+          }
+
+          // Loop processing any incoming update messages.
+          while (!listenerThread.isShutdownInitiated())
+          {
+            final UpdateMsg updateMsg = receive();
+            if (updateMsg == null)
+            {
+              // The server is shutting down.
+              listenerThread.initiateShutdown();
+            }
+            else if (processUpdate(updateMsg))
+            {
+              /*
+               * Warning: in synchronous mode, no way to tell the replay of an
+               * update went wrong Just put null in processUpdateDone so that if
+               * assured replication is used the ack is sent without error at
+               * replay flag.
+               */
+              processUpdateDone(updateMsg, null);
+              state.update(updateMsg.getCSN());
+            }
+          }
+
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("Replication Listener thread stopping.");
+          }
+        }
+      }, threadName);
+
       listenerThread.start();
     }
   }
@@ -3041,14 +3061,34 @@
       // Stop the listener thread
       if (listenerThread != null)
       {
-        listenerThread.shutdown();
-        listenerThread.waitForShutdown();
+        listenerThread.initiateShutdown();
+        try
+        {
+          listenerThread.join();
+        }
+        catch (InterruptedException e)
+        {
+          // Give up waiting.
+        }
         listenerThread = null;
       }
     }
   }
 
   /**
+   * Returns {@code true} if the listener thread is shutting down or has
+   * shutdown.
+   *
+   * @return {@code true} if the listener thread is shutting down or has
+   *         shutdown.
+   */
+  protected final boolean isListenerShuttingDown()
+  {
+    final DirectoryThread tmp = listenerThread;
+    return tmp == null || tmp.isShutdownInitiated();
+  }
+
+  /**
    * Restart the Replication service after a {@link #disableService()}.
    * <p>
    * The Replication Service will restart from the point indicated by the
@@ -3065,10 +3105,7 @@
     synchronized (sessionLock)
     {
       broker.start();
-
-      // Create the listener thread
-      listenerThread = new ListenerThread(this);
-      listenerThread.start();
+      startListenService();
     }
   }
 
@@ -3156,6 +3193,8 @@
    */
   public abstract long countEntries() throws DirectoryException;
 
+
+
   /**
    * This method should handle the processing of {@link UpdateMsg} receive from
    * remote replication entities.
@@ -3165,20 +3204,17 @@
    *
    * @param updateMsg
    *          The {@link UpdateMsg} that was received.
-   * @param shutdown
-   *          whether the server initiated shutdown
    * @return A boolean indicating if the processing is completed at return time.
    *         If <code> true </code> is returned, no further processing is
    *         necessary. If <code> false </code> is returned, the subclass should
    *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
    *         update the ServerState When this processing is complete.
    */
-  public abstract boolean processUpdate(UpdateMsg updateMsg,
-      AtomicBoolean shutdown);
+  public abstract boolean processUpdate(UpdateMsg updateMsg);
 
   /**
    * This method must be called after each call to
-   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+   * {@link #processUpdate(UpdateMsg)} when the processing of the
    * update is completed.
    * <p>
    * It is useful for implementation needing to process the update in an
@@ -3192,7 +3228,7 @@
    *          this update, and this is the matching human readable message
    *          describing the problem.
    */
-  public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
+  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
   {
     broker.updateWindowAfterReplay();
 
@@ -3401,7 +3437,7 @@
    * The Replication Service will handle the delivery of this {@link UpdateMsg}
    * to all the participants of this Replication Domain. These members will be
    * receive this {@link UpdateMsg} through a call of the
-   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
+   * {@link #processUpdate(UpdateMsg)} message.
    *
    * @param msg The UpdateMsg that should be pushed.
    */

--
Gitblit v1.10.0