mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.12.2014 2cf4412179a4ca8610d7fbb2108040377290bf82
OPENDJ-1453 (CR-3697) Change time heart beat change numbers should be synced with updates

Left over changes from a failed attempt at sending replica offline messages after all update messages have been sent on replica shutdown (see the JIRA issue for details):
- Shutdown sequence improvements (reordered shutdown stages)
- Code cleanups / refactorings / clarifications



DirectoryServer.java:
In shutDown(), removed dead code + reordered stages to: shutdown connection handlers, then work queue, then replication, then the rest.
shutting down the work queue waits for the worker threads to exit for ServerShutdownMonitor.WAIT_TIME. To be improved by OPENDJ-1469 2 phase shutdown
Made several constants final.
Made several methods private.

ServerShutdownMonitor.java:
Extracted WAIT_TIME constants to reuse it in DirectoryServer.shutDown().

ReplicationBroker.java
Reordered shutdown sequence: first shutdown changeTime heartbeat publisher thread, then RS heartbeat monitoring thread, then set no connected RS.


ReplicationDomain.java, DummyReplicationDomain.java:
Made status private + added signalNewStatus().

LDAPReplicationDomain.java:
Consequence of the change to signalNewStatus().

ReplicationServerDomain.java:
Added PendingStatusMessages.toString().
Made some methods private.
In sendPendingTopologyMsgs(), avoid building a topology message if there is no RSs to send it to.


PendingChange.java:
Code cleanup.
Removed useless field/methods targetDN, getTargetDN(), setOp().
Added toString().

PendingChanges.java:
In putLocalOperation(), avoid storing synchronization operations because they will never be sent (see code in pushCommittedChanges()).
In pushCommittedChanges(), made better use of TreeMap API + do not return int anymore (it was never used).
In commitAndPushCommittedChanges(), do not return int anymore (it was never used).

RemotePendingChanges.java:
Consequence of the change to PendingChange.getTargetDN().
Renamed targetDn local variables to targetDN.


TraditionalWorkerThread.java, CSNGenerator.java:
Code cleanup.
12 files modified
421 ■■■■ changed files
opends/src/server/org/opends/server/core/DirectoryServer.java 104 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/ServerShutdownMonitor.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/extensions/TraditionalWorkerThread.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/CSNGenerator.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChange.java 58 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 59 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java 93 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 35 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 12 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/DirectoryServer.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2010-2013 ForgeRock AS.
 *      Portions Copyright 2010-2014 ForgeRock AS.
 */
package org.opends.server.core;
@@ -136,43 +136,43 @@
   * Returned when the user specified the --checkStartability option with other
   * options like printing the usage, dumping messages, displaying version, etc.
   */
  private static int NOTHING_TO_DO = 0;
  private static final int NOTHING_TO_DO = 0;
  /**
   * Returned when the user specified the --checkStartability option with
   * some incompatible arguments.
   */
  private static int CHECK_ERROR = 1;
  private static final int CHECK_ERROR = 1;
  /**
   * The server is already started.
   */
  private static int SERVER_ALREADY_STARTED = 98;
  private static final int SERVER_ALREADY_STARTED = 98;
  /**
   * The server must be started as detached process.
   */
  private static int START_AS_DETACH = 99;
  private static final int START_AS_DETACH = 99;
  /**
   * The server must be started as a non-detached process.
   */
  private static int START_AS_NON_DETACH = 100;
  private static final int START_AS_NON_DETACH = 100;
  /**
   * The server must be started as a window service.
   */
  private static int START_AS_WINDOWS_SERVICE = 101;
  private static final int START_AS_WINDOWS_SERVICE = 101;
  /**
   * The server must be started as detached and it is being called from the
   * Windows Service.
   */
  private static int START_AS_DETACH_CALLED_FROM_WINDOWS_SERVICE = 102;
  private static final int START_AS_DETACH_CALLED_FROM_WINDOWS_SERVICE = 102;
  /**
   * The server must be started as detached process and should not produce any
   * output.
   */
  private static int START_AS_DETACH_QUIET = 103;
  private static final int START_AS_DETACH_QUIET = 103;
  /**
   * The server must be started as non-detached process and should not produce
   * any output.
   */
  private static int START_AS_NON_DETACH_QUIET = 104;
  private static final int START_AS_NON_DETACH_QUIET = 104;
  /** The policy to use regarding single structural objectclass enforcement. */
  private AcceptRejectWarn singleStructuralClassPolicy;
@@ -524,7 +524,7 @@
  private int lookthroughLimit;
  /** The current active persistent searches. */
  private AtomicInteger activePSearches = new AtomicInteger(0);
  private final AtomicInteger activePSearches = new AtomicInteger(0);
  /** The maximum number of concurrent persistent searches. */
  private int maxPSearches;
@@ -951,8 +951,7 @@
   * @throws  InitializationException  If a problem occurs while attempting to
   *                                   bootstrap the server.
   */
  public void bootstrapServer()
         throws InitializationException
  private void bootstrapServer() throws InitializationException
  {
    // First, make sure that the server isn't currently running.  If it isn't,
    // then make sure that no other thread will try to start or bootstrap the
@@ -2015,7 +2014,7 @@
   *                                   the backends that is not related to the
   *                                   server configuration.
   */
  public void initializeBackends()
  private void initializeBackends()
          throws ConfigException, InitializationException
  {
    backendConfigManager = new BackendConfigManager();
@@ -2119,9 +2118,8 @@
   *                              workflow conflicts with the workflow
   *                              ID of an existing workflow.
   */
  public static void createAndRegisterWorkflowsWithDefaultNetworkGroup(
      Backend backend
      ) throws DirectoryException
  private static void createAndRegisterWorkflowsWithDefaultNetworkGroup(
      Backend backend) throws DirectoryException
  {
    // Create a workflow for each backend base DN and register the workflow
    // with the default/internal/admin network group.
@@ -2152,10 +2150,8 @@
   *                              workflow conflicts with the workflow
   *                              ID of an existing workflow.
   */
  public static WorkflowImpl createWorkflow(
      DN      baseDN,
      Backend backend
      ) throws DirectoryException
  private static WorkflowImpl createWorkflow(DN baseDN, Backend backend)
      throws DirectoryException
  {
    String backendID = backend.getBackendID();
@@ -2348,7 +2344,7 @@
   *                                   attempting to initialize and start the
   *                                   Directory Server.
   */
  public void configureWorkflowsManual()
  private void configureWorkflowsManual()
      throws ConfigException, InitializationException
  {
    // First of all re-initialize the current workflow configuration
@@ -2436,7 +2432,7 @@
   *                                   the group manager that is not related to
   *                                   the server configuration.
   */
  public void initializeGroupManager()
  private void initializeGroupManager()
         throws ConfigException, InitializationException
  {
    try
@@ -7915,16 +7911,9 @@
      directoryServer.shuttingDown = true;
    }
    try {
      directoryServer.configHandler.getConfigRootEntry();
    } catch (Exception e) {
    }
    // Send an alert notification that the server is shutting down.
    Message message = NOTE_SERVER_SHUTDOWN.get(className, reason);
    sendAlertNotification(directoryServer, ALERT_TYPE_SERVER_SHUTDOWN,
            message);
        NOTE_SERVER_SHUTDOWN.get(className, reason));
    // Create a shutdown monitor that will watch the rest of the shutdown
@@ -7951,7 +7940,18 @@
    }
    directoryServer.connectionHandlers.clear();
    if (directoryServer.workQueue != null)
    {
      directoryServer.workQueue.finalizeWorkQueue(reason);
      directoryServer.workQueue.waitUntilIdle(ServerShutdownMonitor.WAIT_TIME);
    }
    // shutdown replication
    for (SynchronizationProvider provider :
         directoryServer.synchronizationProviders)
    {
      provider.finalizeSynchronizationProvider();
    }
    // Call the shutdown plugins, and then finalize all the plugins defined in
    // the server.
@@ -7961,14 +7961,6 @@
      directoryServer.pluginConfigManager.finalizePlugins();
    }
    // shutdown the Synchronization Providers
    for (SynchronizationProvider provider :
         directoryServer.synchronizationProviders)
    {
      provider.finalizeSynchronizationProvider();
    }
    // Deregister the shutdown hook.
    if (directoryServer.shutdownHook != null)
    {
@@ -7980,13 +7972,6 @@
    }
    // Stop the work queue.
    if (directoryServer.workQueue != null)
    {
      directoryServer.workQueue.finalizeWorkQueue(reason);
    }
    // Notify all the shutdown listeners.
    for (ServerShutdownListener shutdownListener :
         directoryServer.shutdownListeners)
@@ -8151,9 +8136,8 @@
          StringBuilder failureReason = new StringBuilder();
          if (! LockFileManager.releaseLock(lockFile, failureReason))
          {
            message = WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK.
                get(backend.getBackendID(), String.valueOf(failureReason));
            logError(message);
            logError(WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK.get(
                backend.getBackendID(), String.valueOf(failureReason)));
            // FIXME -- Do we need to send an admin alert?
          }
        }
@@ -8164,9 +8148,8 @@
            TRACER.debugCaught(DebugLogLevel.ERROR, e2);
          }
          message = WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK.
              get(backend.getBackendID(), stackTraceToSingleLineString(e2));
          logError(message);
          logError(WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK.get(
              backend.getBackendID(), stackTraceToSingleLineString(e2)));
          // FIXME -- Do we need to send an admin alert?
        }
      }
@@ -8187,14 +8170,11 @@
    }
    // Release exclusive lock held on server.lock file
    String serverLockFileName = LockFileManager.getServerLockFileName();
    StringBuilder failureReason = new StringBuilder();
    try {
        if (!LockFileManager.releaseLock(serverLockFileName,
                failureReason)) {
            message = NOTE_SERVER_SHUTDOWN.get(className, failureReason);
            logError(message);
        String serverLockFileName = LockFileManager.getServerLockFileName();
        StringBuilder failureReason = new StringBuilder();
        if (!LockFileManager.releaseLock(serverLockFileName, failureReason)) {
            logError(NOTE_SERVER_SHUTDOWN.get(className, failureReason));
        }
    } catch (Exception e) {
        if (debugEnabled()) {
@@ -9441,13 +9421,11 @@
   * @return Returns the class loader to be used with this directory
   *         server application.
   */
  public static ClassLoader getClassLoader()
  private static ClassLoader getClassLoader()
  {
    return ClassLoaderProvider.getInstance().getClassLoader();
  }
  /**
   * Loads the named class using this directory server application's
   * class loader.
@@ -9646,7 +9624,7 @@
   *
   * @return the workflow configuration mode
   */
  public static boolean workflowConfigurationModeIsAuto()
  private static boolean workflowConfigurationModeIsAuto()
  {
    return directoryServer.workflowConfigurationMode
        == WorkflowConfigurationMode.AUTO;
opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
@@ -36,9 +36,18 @@
 * This class defines a daemon thread that will be used to monitor the server
 * shutdown process and may help nudge it along if it appears to get hung.
 */
public class ServerShutdownMonitor extends DirectoryThread
class ServerShutdownMonitor extends DirectoryThread
{
  /**
   * Time in milliseconds for the shutdown monitor to:
   * <ol>
   * <li>wait before sending interrupt to threads</li>
   * <li>wait before final shutdown</li>
   * </ol>
   */
  static final long WAIT_TIME = 30000;
  /**
   * Indicates whether the monitor has completed and the shutdown may be
   * finalized with a call to {@link System#exit()}.
   */
@@ -110,7 +119,7 @@
      // For the first milestone, we'll run for up to 30 seconds just checking
      // to see whether all threads have stopped yet.
      if (waitAllThreadsDied(30000))
      if (waitAllThreadsDied(WAIT_TIME))
      {
        return;
      }
@@ -129,7 +138,7 @@
        } catch (Exception e) {}
      }
      if (waitAllThreadsDied(30000))
      if (waitAllThreadsDied(WAIT_TIME))
      {
        return;
      }
opends/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.extensions;
import java.util.Map;
import org.opends.messages.Message;
@@ -38,12 +37,11 @@
import org.opends.server.types.DisconnectReason;
import org.opends.server.types.Operation;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a data structure for storing and interacting with a
 * Directory Server worker thread.
@@ -63,7 +61,7 @@
  private volatile boolean shutdownRequested;
  /**
   * Indicates whether this thread was stopped because the server threadnumber
   * Indicates whether this thread was stopped because the server thread number
   * was reduced.
   */
  private boolean stoppedByReducedThreadNumber;
@@ -72,13 +70,13 @@
  private boolean waitingForWork;
  /** The operation that this worker thread is currently processing. */
  private Operation operation;
  private volatile Operation operation;
  /** The handle to the actual thread for this worker thread. */
  private Thread workerThread;
  /** The work queue that this worker thread will service. */
  private TraditionalWorkQueue workQueue;
  private final TraditionalWorkQueue workQueue;
@@ -129,7 +127,7 @@
   */
  public boolean isActive()
  {
    return (isAlive() && (operation != null));
    return isAlive() && operation != null;
  }
@@ -148,7 +146,7 @@
      try
      {
        waitingForWork = true;
        operation = null;
        operation = null; // this line is necessary because next line can block
        operation = workQueue.nextOperation(this);
        waitingForWork = false;
opends/src/server/org/opends/server/replication/common/CSNGenerator.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
@@ -44,7 +44,7 @@
   * @see #lastTime
   */
  private int seqnum;
  private int serverId;
  private final int serverId;
  /**
   * Create a new {@link CSNGenerator}.
@@ -64,12 +64,12 @@
  /**
  * Create a new {@link CSNGenerator}.
  *
  * @param id id to use when creating {@link CSN}s.
  * @param serverId serverId to use when creating {@link CSN}s.
  * @param state This generator will be created in a way that makes sure that
  *              all {@link CSN}s generated will be larger than all the
  *              {@link CSN}s currently in state.
  */
  public CSNGenerator(int id, ServerState state)
  public CSNGenerator(int serverId, ServerState state)
  {
    this.lastTime = TimeThread.getTime();
    for (CSN csn : state)
@@ -78,12 +78,12 @@
      {
        this.lastTime = csn.getTime();
      }
      if (csn.getServerId() == id)
      if (csn.getServerId() == serverId)
      {
        this.seqnum = csn.getSeqnum();
      }
    }
    this.serverId = id;
    this.serverId = serverId;
  }
  /**
@@ -104,7 +104,7 @@
        lastTime = curTime;
      }
      if (++seqnum <= 0)
      if (++seqnum <= 0) // check no underflow happened
      {
        seqnum = 0;
        lastTime++;
@@ -155,7 +155,7 @@
        lastTime = ++rcvdTime;
      }
      if ((serverId == changeServerId) && (seqnum < changeSeqNum))
      if (serverId == changeServerId && seqnum < changeSeqNum)
      {
        seqnum = changeSeqNum;
      }
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -97,6 +97,9 @@
 *  processing a change received from the replicationServer service,
 *  handle conflict resolution,
 *  handle protocol messages from the replicationServer.
 * <p>
 * FIXME Move this class to org.opends.server.replication.service
 * or the equivalent package once this code is moved to a maven module.
 */
public final class LDAPReplicationDomain extends ReplicationDomain
       implements ConfigurationChangeListener<ReplicationDomainCfg>,
@@ -4095,9 +4098,7 @@
    // Now for bad data set status if needed
    if (forceBadDataSet)
    {
      // Go into bad data set status
      setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
      broker.signalStatusChange(status);
      signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
      logError(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC.get(getBaseDNString()));
      return; // Do not send changes to the replication server
    }
opends/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -22,28 +22,26 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS
 *      Portions copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.types.DN;
import org.opends.server.types.operation.PluginOperation;
/**
 * This class is use to store an operation currently
 * in progress and not yet committed in the database.
 */
public class PendingChange implements Comparable<PendingChange>
class PendingChange implements Comparable<PendingChange>
{
  private CSN csn;
  private final CSN csn;
  private boolean committed;
  private LDAPUpdateMsg msg;
  private PluginOperation op;
  private final PluginOperation op;
  private ServerState dependencyState;
  private DN targetDN;
  /**
   * Construct a new PendingChange.
@@ -51,7 +49,7 @@
   * @param op the operation to use
   * @param msg the message to use (can be null for local operations)
   */
  public PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
  PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
  {
    this.csn = csn;
    this.committed = false;
@@ -115,15 +113,6 @@
  }
  /**
   * Set the operation associated to this PendingChange.
   * @param op The operation associated to this PendingChange.
   */
  public void setOp(PluginOperation op)
  {
    this.op = op;
  }
  /**
   * Add the given CSN to the list of dependencies of this PendingChange.
   *
   * @param csn
@@ -152,29 +141,24 @@
    return state.cover(dependencyState);
  }
  /**
   * Get the Target DN of this message.
   *
   * @return The target DN of this message.
   */
  public DN getTargetDN()
  {
    synchronized (this)
    {
      if (targetDN == null)
      {
        targetDN = msg.getDN();
      }
      return targetDN;
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int compareTo(PendingChange o)
  {
    return getCSN().compareTo(o.getCSN());
    return csn.compareTo(o.csn);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName()
        + " committed=" + committed
        + ", csn=" + csn.toStringUI()
        + ", msg=[" + msg
        + "], isOperationSynchronized="
        + (op != null ? op.isSynchronizationOperation() : "false")
        + ", dependencyState="
        + (dependencyState != null ? dependencyState : "");
  }
}
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,8 +26,8 @@
 */
package org.opends.server.replication.plugin;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.server.replication.common.CSN;
@@ -51,19 +51,19 @@
  /**
   * A map used to store the pending changes.
   */
  private SortedMap<CSN, PendingChange> pendingChanges =
    new TreeMap<CSN, PendingChange>();
  private final TreeMap<CSN, PendingChange> pendingChanges =
      new TreeMap<CSN, PendingChange>();
  /**
   * The {@link CSNGenerator} to use to create new unique CSNs
   * for each operation done on the replication domain.
   */
  private CSNGenerator csnGenerator;
  private final CSNGenerator csnGenerator;
  /**
   * The ReplicationDomain that will be used to send UpdateMsg.
   */
  private ReplicationDomain domain;
  private final ReplicationDomain domain;
  private boolean recoveringOldChanges = false;
@@ -128,34 +128,32 @@
  synchronized CSN putLocalOperation(PluginOperation operation)
  {
    final CSN csn = csnGenerator.newCSN();
    final PendingChange change = new PendingChange(csn, operation, null);
    pendingChanges.put(csn, change);
    if (!operation.isSynchronizationOperation())
    {
      pendingChanges.put(csn, new PendingChange(csn, operation, null));
    }
    return csn;
  }
  /**
   * Push all committed local changes to the replicationServer service.
   *
   * @return The number of pushed updates.
   */
  synchronized int pushCommittedChanges()
  synchronized void pushCommittedChanges()
  {
    int numSentUpdates = 0;
    if (pendingChanges.isEmpty())
    // peek the oldest change
    Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry();
    if (firstEntry == null)
    {
      return numSentUpdates;
      return;
    }
    // peek the oldest CSN
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    PendingChange firstChange = firstEntry.getValue();
    while (firstChange != null && firstChange.isCommitted())
    {
      final PluginOperation op = firstChange.getOp();
      if (op != null && !op.isSynchronizationOperation())
      {
        numSentUpdates++;
        final LDAPUpdateMsg updateMsg = firstChange.getMsg();
        if (!recoveringOldChanges)
        {
@@ -168,20 +166,14 @@
          domain.getServerState().update(updateMsg.getCSN());
        }
      }
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        // peek the oldest CSN
        firstCSN = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstCSN);
      }
      // false warning: firstEntry will not be null if firstChange is not null
      pendingChanges.remove(firstEntry.getKey());
      // peek the oldest change
      firstEntry = pendingChanges.firstEntry();
      firstChange = firstEntry != null ? firstEntry.getValue() : null;
    }
    return numSentUpdates;
  }
  /**
@@ -189,16 +181,13 @@
   * push all committed local changes to the replicationServer service
   * in a single atomic operation.
   *
   *
   * @param csn The CSN of the update message that must be set as committed.
   * @param msg          The message associated to the update.
   *
   * @return The number of pushed updates.
   * @param msg The message associated to the update.
   */
  synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
  synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
  {
    commit(csn, msg);
    return pushCommittedChanges();
    pushCommittedChanges();
  }
  /**
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
@@ -48,7 +48,7 @@
 *
 * One of this object is instantiated for each ReplicationDomain.
 */
public final class RemotePendingChanges
final class RemotePendingChanges
{
  /**
   * A map used to store the pending changes.
@@ -124,7 +124,7 @@
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    while ((firstChange != null) && firstChange.isCommitted())
    while (firstChange != null && firstChange.isCommitted())
    {
      state.update(firstCSN);
      pendingChanges.remove(firstCSN);
@@ -196,17 +196,19 @@
  public synchronized boolean checkDependencies(AddOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    CSN csn = OperationContext.getCSN(op);
    PendingChange change = pendingChanges.get(csn);
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
@@ -215,7 +217,7 @@
             * Check is the operation to be run is a deleteOperation on the
             * same DN.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -227,7 +229,7 @@
             * Check if the operation to be run is an addOperation on a
             * parent of the current AddOperation.
             */
            if (pendingChange.getTargetDN().isAncestorOf(targetDn))
            if (pendingMsg.getDN().isAncestorOf(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -240,15 +242,15 @@
             * the same target DN as the ADD DN
             * or a ModifyDnOperation with new DN equals to the ADD DN parent
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            else
            {
              ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
              if (pendingModDn.newDNIsParent(targetDn))
              final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
              if (pendingModDn.newDNIsParent(targetDN))
              {
                hasDependencies = true;
                addDependency(change, pendingChange);
@@ -286,30 +288,26 @@
  public synchronized boolean checkDependencies(ModifyOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    CSN csn = OperationContext.getCSN(op);
    PendingChange change = pendingChanges.get(csn);
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg instanceof AddMsg)
        {
          if (pendingMsg instanceof AddMsg)
          // Check if the operation to be run is an addOperation on a same DN.
          if (pendingMsg.getDN().equals(targetDN))
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * same DN.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            hasDependencies = true;
            addDependency(change, pendingChange);
          }
        }
      }
@@ -342,29 +340,30 @@
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyDNMsg msg)
  private synchronized boolean checkDependencies(ModifyDNMsg msg)
  {
    boolean hasDependencies = false;
    CSN csn = msg.getCSN();
    PendingChange change = pendingChanges.get(csn);
    final CSN csn = msg.getCSN();
    final PendingChange change = pendingChanges.get(csn);
    if (change == null)
    {
      return false;
    }
    DN targetDn = change.getTargetDN();
    final DN targetDN = change.getMsg().getDN();
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            // Check if the target of the Delete is the same
            // as the new DN of this ModifyDN
            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
            if (msg.newDNIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -374,14 +373,14 @@
          {
            // Check if the Add Operation was done on the new parent of
            // the MODDN  operation
            if (msg.newParentIsEqual(pendingChange.getTargetDN()))
            if (msg.newParentIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            // Check if the AddOperation was done on the same DN as the
            // target DN of the MODDN operation
            if (pendingChange.getTargetDN().equals(targetDn))
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -391,7 +390,7 @@
          {
            // Check if the ModifyDNOperation was done from the new DN of
            // the MODDN operation
            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
            if (msg.newDNIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -431,17 +430,19 @@
  public synchronized boolean checkDependencies(DeleteOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    CSN csn = OperationContext.getCSN(op);
    PendingChange change = pendingChanges.get(csn);
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
@@ -450,7 +451,7 @@
             * Check if the operation to be run is a deleteOperation on a
             * children of the current DeleteOperation.
             */
            if (pendingChange.getTargetDN().isDescendantOf(targetDn))
            if (pendingMsg.getDN().isDescendantOf(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -462,7 +463,7 @@
             * Check if the operation to be run is an addOperation on a
             * parent of the current DeleteOperation.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
@@ -470,13 +471,13 @@
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
            final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
            /*
             * Check if the operation to be run is an ModifyDNOperation
             * on a children of the current DeleteOperation
             */
            if ((pendingChange.getTargetDN().isDescendantOf(targetDn)) ||
                (pendingModDn.newDNIsParent(targetDn)))
            if (pendingMsg.getDN().isDescendantOf(targetDN)
                || pendingModDn.newDNIsParent(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -89,12 +89,12 @@
   * topology. Using an AtomicReference to avoid leaking references to costly
   * threads.
   */
  private AtomicReference<MonitoringPublisher> monitoringPublisher =
  private final AtomicReference<MonitoringPublisher> monitoringPublisher =
      new AtomicReference<MonitoringPublisher>();
  /**
   * Maintains monitor data for the current domain.
   */
  private ReplicationDomainMonitor domainMonitor =
  private final ReplicationDomainMonitor domainMonitor =
      new ReplicationDomainMonitor(this);
  /**
@@ -119,7 +119,7 @@
  private final ReplicationDomainDB domainDB;
  /** The ReplicationServer that created the current instance. */
  private ReplicationServer localReplicationServer;
  private final ReplicationServer localReplicationServer;
  /**
   * The generationId of the current replication domain. The generationId is
@@ -161,7 +161,7 @@
   * The timer used to run the timeout code (timer tasks) for the assured update
   * messages we are waiting acks for.
   */
  private Timer assuredTimeoutTimer;
  private final Timer assuredTimeoutTimer;
  /**
   * Counter used to purge the timer tasks references in assuredTimeoutTimer,
   * every n number of treated assured messages.
@@ -187,8 +187,6 @@
    private boolean sendDSTopologyMsg;
    private int excludedDSForTopologyMsg = -1;
    /**
     * Enqueues a TopologyMsg for all the connected directory servers in order
     * to let them know the topology (every known DSs and RSs).
@@ -213,8 +211,6 @@
      }
    }
    /**
     * Enqueues a TopologyMsg for all the connected replication servers in order
     * to let them know our connected LDAP servers.
@@ -224,8 +220,6 @@
      sendRSTopologyMsg = true;
    }
    /**
     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
     * all other RS instances.
@@ -238,19 +232,28 @@
      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
    }
    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
    {
      pendingDSMonitorMsgs.put(dsServerId, msg);
    }
    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
    {
      pendingRSMonitorMsgs.put(rsServerId, msg);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return getClass().getSimpleName()
          + " pendingHeartbeats=" + pendingHeartbeats
          + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs
          + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs
          + ", sendRSTopologyMsg=" + sendRSTopologyMsg
          + ", sendDSTopologyMsg=" + sendDSTopologyMsg
          + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg;
    }
  }
  private final Object pendingStatusMessagesLock = new Object();
@@ -2117,7 +2120,7 @@
  /**
   * Clears the Db associated with that domain.
   */
  public void clearDbs()
  private void clearDbs()
  {
    try
    {
@@ -2803,7 +2806,7 @@
      }
    }
    if (pendingMsgs.sendRSTopologyMsg)
    if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty())
    {
      final TopologyMsg topoMsg = createTopologyMsgForRS();
      for (ServerHandler handler : connectedRSs.values())
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2802,10 +2802,10 @@
    synchronized (startStopLock)
    {
      stopChangeTimeHeartBeatPublishing();
      stopRSHeartBeatMonitoring();
      shutdown = true;
      setConnectedRS(ConnectedRS.stopped());
      stopRSHeartBeatMonitoring();
      stopChangeTimeHeartBeatPublishing();
      deregisterReplicationMonitor();
    }
  }
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -214,7 +214,7 @@
  /**
   * Current status for this replicated domain.
   */
  protected ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
  /**
   * The tracer object for the debug logger.
@@ -2461,7 +2461,13 @@
   * event.
   * @param event The event that may make the status be changed
   */
  protected void setNewStatus(StatusMachineEvent event)
  protected void signalNewStatus(StatusMachineEvent event)
  {
    setNewStatus(event);
    broker.signalStatusChange(status);
  }
  private void setNewStatus(StatusMachineEvent event)
  {
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
@@ -3433,7 +3439,7 @@
   * receive this {@link UpdateMsg} through a call of the
   * {@link #processUpdate(UpdateMsg)} message.
   *
   * @param msg The UpdateMsg that should be pushed.
   * @param msg The UpdateMsg that should be published.
   */
  public void publish(UpdateMsg msg)
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -52,7 +52,7 @@
  }
  @Override
  protected void setNewStatus(StatusMachineEvent event)
  protected void signalNewStatus(StatusMachineEvent event)
  {
  }