mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
23.45.2013 0f7f7b8d5e655ccd36aca7d9a3c425dfcd23ad62
Fix feature envy between ListenerThread and ReplicationDomain

ListenerThread was 100% implemented in terms of ReplicationDomain method calls. It should therefore be part of ReplicationDomain.

* inline ListenerThread into ReplicationDomain and use new DirectoryThread lifecycle methods
* remove shutdown parameter from ReplicationDomain.processUpdate()
* reduce visibility of ReplicationDomain.receive()
* inline ReplicationDomain.processUpdateDoneSynchronous().
1 files deleted
8 files modified
305 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 4 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java 157 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 114 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java 14 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 4 ●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4535,7 +4535,7 @@
  /** {@inheritDoc} */
  @Override
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    // Ignore message if fractional configuration is inconsistent and
    // we have been passed into bad data set status
@@ -4569,7 +4569,7 @@
      // Put update message into the replay queue
      // (block until some place in the queue is available)
      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
      while (!shutdown.get())
      while (!isListenerShuttingDown())
      {
        // loop until we can offer to the queue or shutdown was initiated
        try
opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
File was deleted
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -34,7 +34,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -79,7 +78,7 @@
 *   and which can start receiving updates.
 * <p>
 *   When updates are received the Replication Service calls the
 *   {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
 *   {@link #processUpdate(UpdateMsg)} method.
 *   ReplicationDomain implementation should implement the appropriate code
 *   for replaying the update on the local repository.
 *   When fully done the subclass must call the
@@ -156,7 +155,7 @@
   * them to the global incoming update message queue for later processing by
   * replay threads.
   */
  private ListenerThread listenerThread;
  private volatile DirectoryThread listenerThread = null;
  /**
   * A Map used to store all the ReplicationDomains created on this server.
@@ -740,7 +739,7 @@
   * Also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
  UpdateMsg receive()
  private UpdateMsg receive()
  {
    UpdateMsg update = null;
@@ -2715,25 +2714,6 @@
  }
  /**
   * This method is called when the ReplicationDomain has completed the
   * processing of a received update synchronously.
   * In such cases the processUpdateDone () is called and the state
   * is updated automatically.
   *
   * @param msg The UpdateMessage that was processed.
   */
  void processUpdateDoneSynchronous(UpdateMsg msg)
  {
    /*
    Warning: in synchronous mode, no way to tell the replay of an update went
    wrong Just put null in processUpdateDone so that if assured replication
    is used the ack is sent without error at replay flag.
    */
    processUpdateDone(msg, null);
    state.update(msg.getCSN());
  }
  /**
   * Check if the domain is connected to a ReplicationServer.
   *
   * @return true if the server is connected, false if not.
@@ -3000,7 +2980,7 @@
   * Starts the receiver side of the Replication Service.
   * <p>
   * After this method has been called, the Replication Service will start
   * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
   * calling the {@link #processUpdate(UpdateMsg)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(ReplicationDomainCfg)}.
@@ -3009,8 +2989,48 @@
  {
    synchronized (sessionLock)
    {
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      final String threadName = "Replica DS(" + getServerId()
          + ") listener for domain \"" + getBaseDNString() + "\"";
      listenerThread = new DirectoryThread(new Runnable()
      {
        @Override
        public void run()
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("Replication Listener thread starting.");
          }
          // Loop processing any incoming update messages.
          while (!listenerThread.isShutdownInitiated())
          {
            final UpdateMsg updateMsg = receive();
            if (updateMsg == null)
            {
              // The server is shutting down.
              listenerThread.initiateShutdown();
            }
            else if (processUpdate(updateMsg))
            {
              /*
               * Warning: in synchronous mode, no way to tell the replay of an
               * update went wrong Just put null in processUpdateDone so that if
               * assured replication is used the ack is sent without error at
               * replay flag.
               */
              processUpdateDone(updateMsg, null);
              state.update(updateMsg.getCSN());
            }
          }
          if (debugEnabled())
          {
            TRACER.debugInfo("Replication Listener thread stopping.");
          }
        }
      }, threadName);
      listenerThread.start();
    }
  }
@@ -3041,14 +3061,34 @@
      // Stop the listener thread
      if (listenerThread != null)
      {
        listenerThread.shutdown();
        listenerThread.waitForShutdown();
        listenerThread.initiateShutdown();
        try
        {
          listenerThread.join();
        }
        catch (InterruptedException e)
        {
          // Give up waiting.
        }
        listenerThread = null;
      }
    }
  }
  /**
   * Returns {@code true} if the listener thread is shutting down or has
   * shutdown.
   *
   * @return {@code true} if the listener thread is shutting down or has
   *         shutdown.
   */
  protected final boolean isListenerShuttingDown()
  {
    final DirectoryThread tmp = listenerThread;
    return tmp == null || tmp.isShutdownInitiated();
  }
  /**
   * Restart the Replication service after a {@link #disableService()}.
   * <p>
   * The Replication Service will restart from the point indicated by the
@@ -3065,10 +3105,7 @@
    synchronized (sessionLock)
    {
      broker.start();
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      listenerThread.start();
      startListenService();
    }
  }
@@ -3156,6 +3193,8 @@
   */
  public abstract long countEntries() throws DirectoryException;
  /**
   * This method should handle the processing of {@link UpdateMsg} receive from
   * remote replication entities.
@@ -3165,20 +3204,17 @@
   *
   * @param updateMsg
   *          The {@link UpdateMsg} that was received.
   * @param shutdown
   *          whether the server initiated shutdown
   * @return A boolean indicating if the processing is completed at return time.
   *         If <code> true </code> is returned, no further processing is
   *         necessary. If <code> false </code> is returned, the subclass should
   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
   *         update the ServerState When this processing is complete.
   */
  public abstract boolean processUpdate(UpdateMsg updateMsg,
      AtomicBoolean shutdown);
  public abstract boolean processUpdate(UpdateMsg updateMsg);
  /**
   * This method must be called after each call to
   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
   * {@link #processUpdate(UpdateMsg)} when the processing of the
   * update is completed.
   * <p>
   * It is useful for implementation needing to process the update in an
@@ -3192,7 +3228,7 @@
   *          this update, and this is the matching human readable message
   *          describing the problem.
   */
  public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
  {
    broker.updateWindowAfterReplay();
@@ -3401,7 +3437,7 @@
   * The Replication Service will handle the delivery of this {@link UpdateMsg}
   * to all the participants of this Replication Domain. These members will be
   * receive this {@link UpdateMsg} through a call of the
   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
   * {@link #processUpdate(UpdateMsg)} message.
   *
   * @param msg The UpdateMsg that should be pushed.
   */
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -27,7 +27,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
@@ -88,7 +87,7 @@
  }
  @Override
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    return false;
  }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -32,7 +32,6 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -632,7 +631,7 @@
    }
    @Override
    public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
    public boolean processUpdate(UpdateMsg updateMsg)
    {
      if (queue != null)
        queue.add(updateMsg);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -121,7 +121,7 @@
      "uid=simultaneous2");
      // Put the message in the replay queue
      domain.processUpdate(modDnMsg, SHUTDOWN);
      domain.processUpdate(modDnMsg);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -135,7 +135,7 @@
      "uid=simulatneouswrong");
      // Put the message in the replay queue
      domain.processUpdate(modDnMsg, SHUTDOWN);
      domain.processUpdate(modDnMsg);
      // Make the domain replay the change from the replay queue
      // and resolve conflict
@@ -210,7 +210,7 @@
            null);
      // Put the message in the replay queue
      domain.processUpdate(addMsg, SHUTDOWN);
      domain.processUpdate(addMsg);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -294,7 +294,7 @@
            null);
      // Put the message in the replay queue
      domain.processUpdate(addMsg, SHUTDOWN);
      domain.processUpdate(addMsg);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -399,7 +399,7 @@
      delMsg.setSubtreeDelete(true);
      // Put the message in the replay queue
      domain.processUpdate(delMsg, SHUTDOWN);
      domain.processUpdate(delMsg);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -474,7 +474,7 @@
      // NOT SUBTREE
      // Put the message in the replay queue
      domain.processUpdate(delMsg, SHUTDOWN);
      domain.processUpdate(delMsg);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -557,7 +557,7 @@
          new ArrayList<Attribute>());
      // Put the message in the replay queue
      domain.processUpdate(addMsg, SHUTDOWN);
      domain.processUpdate(addMsg);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -33,7 +33,6 @@
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -57,7 +56,6 @@
import org.testng.annotations.Test;
import static java.util.Arrays.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -504,7 +502,7 @@
    }
    @Override
    public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
    public boolean processUpdate(UpdateMsg updateMsg)
    {
      checkUpdateAssuredParameters(updateMsg);
      nReceivedUpdates++;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -31,7 +31,6 @@
import java.io.OutputStream;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.plugin.DomainFakeCfg;
@@ -151,7 +150,7 @@
  }
  @Override
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    if (queue != null)
      queue.add(updateMsg);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -31,7 +31,6 @@
import java.io.OutputStream;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.plugin.DomainFakeCfg;
@@ -136,7 +135,7 @@
  }
  @Override
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    if (queue != null)
      queue.add(updateMsg);