From 0f7f7b8d5e655ccd36aca7d9a3c425dfcd23ad62 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 23 Dec 2013 15:45:05 +0000
Subject: [PATCH] Fix feature envy between ListenerThread and ReplicationDomain

---
 /dev/null                                                                                                                   |  157 --------------------------
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |    4 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java       |    3 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java       |    3 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java    |    3 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |    4 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |  114 ++++++++++++------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java           |   14 +-
 9 files changed, 89 insertions(+), 216 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 754dcec..5dbbd5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4535,7 +4535,7 @@
 
   /** {@inheritDoc} */
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+  public boolean processUpdate(UpdateMsg updateMsg)
   {
     // Ignore message if fractional configuration is inconsistent and
     // we have been passed into bad data set status
@@ -4569,7 +4569,7 @@
       // Put update message into the replay queue
       // (block until some place in the queue is available)
       final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
-      while (!shutdown.get())
+      while (!isListenerShuttingDown())
       {
         // loop until we can offer to the queue or shutdown was initiated
         try
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
deleted file mode 100644
index 88688a6..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License").  You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- *      Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
- */
-package org.opends.server.replication.service;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.opends.messages.Message;
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.protocol.UpdateMsg;
-
-/**
- * Thread that is used to get messages from the Replication servers
- * and replay them in the current server.
- */
-public class ListenerThread extends DirectoryThread
-{
-  /**
-   * The tracer object for the debug logger.
-   */
-  private static final DebugTracer TRACER = getTracer();
-
-  private final ReplicationDomain repDomain;
-  private AtomicBoolean shutdown = new AtomicBoolean(false);
-  private volatile boolean done = false;
-
-
-  /**
-   * Constructor for the ListenerThread.
-   *
-   * @param repDomain the replication domain that created this thread
-   */
-  public ListenerThread(ReplicationDomain repDomain)
-  {
-    super("Replica DS(" + repDomain.getServerId() + ") listener for domain \""
-        + repDomain.getBaseDNString() + "\"");
-    this.repDomain = repDomain;
-  }
-
-  /**
-   * Shutdown this listener thread.
-   */
-  public void shutdown()
-  {
-    shutdown.set(true);
-  }
-
-  /**
-   * Run method for this class.
-   */
-  @Override
-  public void run()
-  {
-    if (debugEnabled())
-    {
-      TRACER.debugInfo("Replication Listener thread starting.");
-    }
-
-    while (!shutdown.get())
-    {
-      UpdateMsg updateMsg = null;
-      try
-      {
-        // Loop receiving update messages and putting them in the update message
-        // queue
-        while (!shutdown.get() && ((updateMsg = repDomain.receive()) != null))
-        {
-          if (repDomain.processUpdate(updateMsg, shutdown))
-          {
-            repDomain.processUpdateDoneSynchronous(updateMsg);
-          }
-        }
-
-        if (updateMsg == null)
-        {
-          shutdown.set(true);
-        }
-      }
-      catch (Exception e)
-      {
-        /*
-         * catch all exceptions happening in repDomain.receive so that the
-         * thread never dies even in case of problems.
-         */
-        Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
-            stackTraceToSingleLineString(e));
-        logError(message);
-      }
-    }
-
-    done = true;
-
-    if (debugEnabled())
-    {
-      TRACER.debugInfo("Replication Listener thread stopping.");
-    }
-  }
-
-
-
-  /**
-   * Wait for the completion of this thread.
-   */
-  public void waitForShutdown()
-  {
-    try
-    {
-      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
-      int n = 0;
-      while (!done && this.isAlive())
-      {
-        Thread.sleep(50);
-        n++;
-        if (n >= FACTOR)
-        {
-          TRACER.debugInfo("Interrupting listener thread for dn "
-              + repDomain.getBaseDNString() + " in DS "
-              + repDomain.getServerId());
-          this.interrupt();
-        }
-      }
-    }
-    catch (InterruptedException e)
-    {
-      // exit the loop if this thread is interrupted.
-    }
-  }
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ae0a284..b01feaa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -34,7 +34,6 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opends.messages.Category;
@@ -79,7 +78,7 @@
  *   and which can start receiving updates.
  * <p>
  *   When updates are received the Replication Service calls the
- *   {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
+ *   {@link #processUpdate(UpdateMsg)} method.
  *   ReplicationDomain implementation should implement the appropriate code
  *   for replaying the update on the local repository.
  *   When fully done the subclass must call the
@@ -156,7 +155,7 @@
    * them to the global incoming update message queue for later processing by
    * replay threads.
    */
-  private ListenerThread listenerThread;
+  private volatile DirectoryThread listenerThread = null;
 
   /**
    * A Map used to store all the ReplicationDomains created on this server.
@@ -740,7 +739,7 @@
    * Also responsible for updating the list of pending changes
    * @return the received message - null if none
    */
-  UpdateMsg receive()
+  private UpdateMsg receive()
   {
     UpdateMsg update = null;
 
@@ -2715,25 +2714,6 @@
   }
 
   /**
-   * This method is called when the ReplicationDomain has completed the
-   * processing of a received update synchronously.
-   * In such cases the processUpdateDone () is called and the state
-   * is updated automatically.
-   *
-   * @param msg The UpdateMessage that was processed.
-   */
-  void processUpdateDoneSynchronous(UpdateMsg msg)
-  {
-    /*
-    Warning: in synchronous mode, no way to tell the replay of an update went
-    wrong Just put null in processUpdateDone so that if assured replication
-    is used the ack is sent without error at replay flag.
-    */
-    processUpdateDone(msg, null);
-    state.update(msg.getCSN());
-  }
-
-  /**
    * Check if the domain is connected to a ReplicationServer.
    *
    * @return true if the server is connected, false if not.
@@ -3000,7 +2980,7 @@
    * Starts the receiver side of the Replication Service.
    * <p>
    * After this method has been called, the Replication Service will start
-   * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
+   * calling the {@link #processUpdate(UpdateMsg)}.
    * <p>
    * This method must be called once and must be called after the
    * {@link #startPublishService(ReplicationDomainCfg)}.
@@ -3009,8 +2989,48 @@
   {
     synchronized (sessionLock)
     {
-      // Create the listener thread
-      listenerThread = new ListenerThread(this);
+      final String threadName = "Replica DS(" + getServerId()
+          + ") listener for domain \"" + getBaseDNString() + "\"";
+
+      listenerThread = new DirectoryThread(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("Replication Listener thread starting.");
+          }
+
+          // Loop processing any incoming update messages.
+          while (!listenerThread.isShutdownInitiated())
+          {
+            final UpdateMsg updateMsg = receive();
+            if (updateMsg == null)
+            {
+              // The server is shutting down.
+              listenerThread.initiateShutdown();
+            }
+            else if (processUpdate(updateMsg))
+            {
+              /*
+               * Warning: in synchronous mode, no way to tell the replay of an
+               * update went wrong Just put null in processUpdateDone so that if
+               * assured replication is used the ack is sent without error at
+               * replay flag.
+               */
+              processUpdateDone(updateMsg, null);
+              state.update(updateMsg.getCSN());
+            }
+          }
+
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("Replication Listener thread stopping.");
+          }
+        }
+      }, threadName);
+
       listenerThread.start();
     }
   }
@@ -3041,14 +3061,34 @@
       // Stop the listener thread
       if (listenerThread != null)
       {
-        listenerThread.shutdown();
-        listenerThread.waitForShutdown();
+        listenerThread.initiateShutdown();
+        try
+        {
+          listenerThread.join();
+        }
+        catch (InterruptedException e)
+        {
+          // Give up waiting.
+        }
         listenerThread = null;
       }
     }
   }
 
   /**
+   * Returns {@code true} if the listener thread is shutting down or has
+   * shutdown.
+   *
+   * @return {@code true} if the listener thread is shutting down or has
+   *         shutdown.
+   */
+  protected final boolean isListenerShuttingDown()
+  {
+    final DirectoryThread tmp = listenerThread;
+    return tmp == null || tmp.isShutdownInitiated();
+  }
+
+  /**
    * Restart the Replication service after a {@link #disableService()}.
    * <p>
    * The Replication Service will restart from the point indicated by the
@@ -3065,10 +3105,7 @@
     synchronized (sessionLock)
     {
       broker.start();
-
-      // Create the listener thread
-      listenerThread = new ListenerThread(this);
-      listenerThread.start();
+      startListenService();
     }
   }
 
@@ -3156,6 +3193,8 @@
    */
   public abstract long countEntries() throws DirectoryException;
 
+
+
   /**
    * This method should handle the processing of {@link UpdateMsg} receive from
    * remote replication entities.
@@ -3165,20 +3204,17 @@
    *
    * @param updateMsg
    *          The {@link UpdateMsg} that was received.
-   * @param shutdown
-   *          whether the server initiated shutdown
    * @return A boolean indicating if the processing is completed at return time.
    *         If <code> true </code> is returned, no further processing is
    *         necessary. If <code> false </code> is returned, the subclass should
    *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
    *         update the ServerState When this processing is complete.
    */
-  public abstract boolean processUpdate(UpdateMsg updateMsg,
-      AtomicBoolean shutdown);
+  public abstract boolean processUpdate(UpdateMsg updateMsg);
 
   /**
    * This method must be called after each call to
-   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+   * {@link #processUpdate(UpdateMsg)} when the processing of the
    * update is completed.
    * <p>
    * It is useful for implementation needing to process the update in an
@@ -3192,7 +3228,7 @@
    *          this update, and this is the matching human readable message
    *          describing the problem.
    */
-  public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
+  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
   {
     broker.updateWindowAfterReplay();
 
@@ -3401,7 +3437,7 @@
    * The Replication Service will handle the delivery of this {@link UpdateMsg}
    * to all the participants of this Replication Domain. These members will be
    * receive this {@link UpdateMsg} through a call of the
-   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
+   * {@link #processUpdate(UpdateMsg)} message.
    *
    * @param msg The UpdateMsg that should be pushed.
    */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index 68db0b8..2a9b6c1 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -27,7 +27,6 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
@@ -88,7 +87,7 @@
   }
 
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+  public boolean processUpdate(UpdateMsg updateMsg)
   {
     return false;
   }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index d46b539..59a7bb0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -32,7 +32,6 @@
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -632,7 +631,7 @@
     }
 
     @Override
-    public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+    public boolean processUpdate(UpdateMsg updateMsg)
     {
       if (queue != null)
         queue.add(updateMsg);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
index 1b48882..8122a1a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -121,7 +121,7 @@
       "uid=simultaneous2");
 
       // Put the message in the replay queue
-      domain.processUpdate(modDnMsg, SHUTDOWN);
+      domain.processUpdate(modDnMsg);
 
       // Make the domain replay the change from the replay queue
       domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -135,7 +135,7 @@
       "uid=simulatneouswrong");
 
       // Put the message in the replay queue
-      domain.processUpdate(modDnMsg, SHUTDOWN);
+      domain.processUpdate(modDnMsg);
 
       // Make the domain replay the change from the replay queue
       // and resolve conflict
@@ -210,7 +210,7 @@
             null);
 
       // Put the message in the replay queue
-      domain.processUpdate(addMsg, SHUTDOWN);
+      domain.processUpdate(addMsg);
 
       // Make the domain replay the change from the replay queue
       domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -294,7 +294,7 @@
             null);
 
       // Put the message in the replay queue
-      domain.processUpdate(addMsg, SHUTDOWN);
+      domain.processUpdate(addMsg);
 
       // Make the domain replay the change from the replay queue
       domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -399,7 +399,7 @@
       delMsg.setSubtreeDelete(true);
 
       // Put the message in the replay queue
-      domain.processUpdate(delMsg, SHUTDOWN);
+      domain.processUpdate(delMsg);
       // Make the domain replay the change from the replay queue
       domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
@@ -474,7 +474,7 @@
       // NOT SUBTREE
 
       // Put the message in the replay queue
-      domain.processUpdate(delMsg, SHUTDOWN);
+      domain.processUpdate(delMsg);
       // Make the domain replay the change from the replay queue
       domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
@@ -557,7 +557,7 @@
           new ArrayList<Attribute>());
 
       // Put the message in the replay queue
-      domain.processUpdate(addMsg, SHUTDOWN);
+      domain.processUpdate(addMsg);
       // Make the domain replay the change from the replay queue
       domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 9d213ba..282c423 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -33,7 +33,6 @@
 import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -57,7 +56,6 @@
 import org.testng.annotations.Test;
 
 import static java.util.Arrays.*;
-
 import static org.assertj.core.api.Assertions.*;
 import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.loggers.ErrorLogger.*;
@@ -504,7 +502,7 @@
     }
 
     @Override
-    public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+    public boolean processUpdate(UpdateMsg updateMsg)
     {
       checkUpdateAssuredParameters(updateMsg);
       nReceivedUpdates++;
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index df894f7..601a39b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -31,7 +31,6 @@
 import java.io.OutputStream;
 import java.util.SortedSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.config.ConfigException;
 import org.opends.server.replication.plugin.DomainFakeCfg;
@@ -151,7 +150,7 @@
   }
 
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+  public boolean processUpdate(UpdateMsg updateMsg)
   {
     if (queue != null)
       queue.add(updateMsg);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index 12fbafa..b0a49c0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -31,7 +31,6 @@
 import java.io.OutputStream;
 import java.util.SortedSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.config.ConfigException;
 import org.opends.server.replication.plugin.DomainFakeCfg;
@@ -136,7 +135,7 @@
   }
 
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+  public boolean processUpdate(UpdateMsg updateMsg)
   {
     if (queue != null)
       queue.add(updateMsg);

--
Gitblit v1.10.0