mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
23.45.2013 0f7f7b8d5e655ccd36aca7d9a3c425dfcd23ad62
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -34,7 +34,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -79,7 +78,7 @@
 *   and which can start receiving updates.
 * <p>
 *   When updates are received the Replication Service calls the
 *   {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
 *   {@link #processUpdate(UpdateMsg)} method.
 *   ReplicationDomain implementation should implement the appropriate code
 *   for replaying the update on the local repository.
 *   When fully done the subclass must call the
@@ -156,7 +155,7 @@
   * them to the global incoming update message queue for later processing by
   * replay threads.
   */
  private ListenerThread listenerThread;
  private volatile DirectoryThread listenerThread = null;
  /**
   * A Map used to store all the ReplicationDomains created on this server.
@@ -740,7 +739,7 @@
   * Also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
  UpdateMsg receive()
  private UpdateMsg receive()
  {
    UpdateMsg update = null;
@@ -2715,25 +2714,6 @@
  }
  /**
   * This method is called when the ReplicationDomain has completed the
   * processing of a received update synchronously.
   * In such cases the processUpdateDone () is called and the state
   * is updated automatically.
   *
   * @param msg The UpdateMessage that was processed.
   */
  void processUpdateDoneSynchronous(UpdateMsg msg)
  {
    /*
    Warning: in synchronous mode, no way to tell the replay of an update went
    wrong Just put null in processUpdateDone so that if assured replication
    is used the ack is sent without error at replay flag.
    */
    processUpdateDone(msg, null);
    state.update(msg.getCSN());
  }
  /**
   * Check if the domain is connected to a ReplicationServer.
   *
   * @return true if the server is connected, false if not.
@@ -3000,7 +2980,7 @@
   * Starts the receiver side of the Replication Service.
   * <p>
   * After this method has been called, the Replication Service will start
   * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
   * calling the {@link #processUpdate(UpdateMsg)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(ReplicationDomainCfg)}.
@@ -3009,8 +2989,48 @@
  {
    synchronized (sessionLock)
    {
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      final String threadName = "Replica DS(" + getServerId()
          + ") listener for domain \"" + getBaseDNString() + "\"";
      listenerThread = new DirectoryThread(new Runnable()
      {
        @Override
        public void run()
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("Replication Listener thread starting.");
          }
          // Loop processing any incoming update messages.
          while (!listenerThread.isShutdownInitiated())
          {
            final UpdateMsg updateMsg = receive();
            if (updateMsg == null)
            {
              // The server is shutting down.
              listenerThread.initiateShutdown();
            }
            else if (processUpdate(updateMsg))
            {
              /*
               * Warning: in synchronous mode, no way to tell the replay of an
               * update went wrong Just put null in processUpdateDone so that if
               * assured replication is used the ack is sent without error at
               * replay flag.
               */
              processUpdateDone(updateMsg, null);
              state.update(updateMsg.getCSN());
            }
          }
          if (debugEnabled())
          {
            TRACER.debugInfo("Replication Listener thread stopping.");
          }
        }
      }, threadName);
      listenerThread.start();
    }
  }
@@ -3041,14 +3061,34 @@
      // Stop the listener thread
      if (listenerThread != null)
      {
        listenerThread.shutdown();
        listenerThread.waitForShutdown();
        listenerThread.initiateShutdown();
        try
        {
          listenerThread.join();
        }
        catch (InterruptedException e)
        {
          // Give up waiting.
        }
        listenerThread = null;
      }
    }
  }
  /**
   * Returns {@code true} if the listener thread is shutting down or has
   * shutdown.
   *
   * @return {@code true} if the listener thread is shutting down or has
   *         shutdown.
   */
  protected final boolean isListenerShuttingDown()
  {
    final DirectoryThread tmp = listenerThread;
    return tmp == null || tmp.isShutdownInitiated();
  }
  /**
   * Restart the Replication service after a {@link #disableService()}.
   * <p>
   * The Replication Service will restart from the point indicated by the
@@ -3065,10 +3105,7 @@
    synchronized (sessionLock)
    {
      broker.start();
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      listenerThread.start();
      startListenService();
    }
  }
@@ -3156,6 +3193,8 @@
   */
  public abstract long countEntries() throws DirectoryException;
  /**
   * This method should handle the processing of {@link UpdateMsg} receive from
   * remote replication entities.
@@ -3165,20 +3204,17 @@
   *
   * @param updateMsg
   *          The {@link UpdateMsg} that was received.
   * @param shutdown
   *          whether the server initiated shutdown
   * @return A boolean indicating if the processing is completed at return time.
   *         If <code> true </code> is returned, no further processing is
   *         necessary. If <code> false </code> is returned, the subclass should
   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
   *         update the ServerState When this processing is complete.
   */
  public abstract boolean processUpdate(UpdateMsg updateMsg,
      AtomicBoolean shutdown);
  public abstract boolean processUpdate(UpdateMsg updateMsg);
  /**
   * This method must be called after each call to
   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
   * {@link #processUpdate(UpdateMsg)} when the processing of the
   * update is completed.
   * <p>
   * It is useful for implementation needing to process the update in an
@@ -3192,7 +3228,7 @@
   *          this update, and this is the matching human readable message
   *          describing the problem.
   */
  public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
  {
    broker.updateWindowAfterReplay();
@@ -3401,7 +3437,7 @@
   * The Replication Service will handle the delivery of this {@link UpdateMsg}
   * to all the participants of this Replication Domain. These members will be
   * receive this {@link UpdateMsg} through a call of the
   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
   * {@link #processUpdate(UpdateMsg)} message.
   *
   * @param msg The UpdateMsg that should be pushed.
   */