From 62c7d9970ee34d102a78ce72a459274a97721cc6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Jun 2014 14:50:38 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3697) Change time heart beat change numbers should be synced with updates

---
 opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java                                              |   93 +++++--------
 opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java                                   |   16 +-
 opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                        |   35 ++--
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java                             |    4 
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java                                 |   59 +++-----
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                          |    7 
 opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java                                        |   15 +
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java                           |   91 ++++++------
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java                             |   12 +
 opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java                                |   12 -
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java                                  |   58 +++-----
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java |    2 
 12 files changed, 188 insertions(+), 216 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java b/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java
index 350b017..6c0a918 100644
--- a/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java
@@ -276,43 +276,43 @@
    * Returned when the user specified the --checkStartability option with other
    * options like printing the usage, dumping messages, displaying version, etc.
    */
-  private static int NOTHING_TO_DO = 0;
+  private static final int NOTHING_TO_DO = 0;
   /**
    * Returned when the user specified the --checkStartability option with
    * some incompatible arguments.
    */
-  private static int CHECK_ERROR = 1;
+  private static final int CHECK_ERROR = 1;
   /**
    * The server is already started.
    */
-  private static int SERVER_ALREADY_STARTED = 98;
+  private static final int SERVER_ALREADY_STARTED = 98;
   /**
    * The server must be started as detached process.
    */
-  private static int START_AS_DETACH = 99;
+  private static final int START_AS_DETACH = 99;
   /**
    * The server must be started as a non-detached process.
    */
-  private static int START_AS_NON_DETACH = 100;
+  private static final int START_AS_NON_DETACH = 100;
   /**
    * The server must be started as a window service.
    */
-  private static int START_AS_WINDOWS_SERVICE = 101;
+  private static final int START_AS_WINDOWS_SERVICE = 101;
   /**
    * The server must be started as detached and it is being called from the
    * Windows Service.
    */
-  private static int START_AS_DETACH_CALLED_FROM_WINDOWS_SERVICE = 102;
+  private static final int START_AS_DETACH_CALLED_FROM_WINDOWS_SERVICE = 102;
   /**
    * The server must be started as detached process and should not produce any
    * output.
    */
-  private static int START_AS_DETACH_QUIET = 103;
+  private static final int START_AS_DETACH_QUIET = 103;
   /**
    * The server must be started as non-detached process and should not produce
    * any output.
    */
-  private static int START_AS_NON_DETACH_QUIET = 104;
+  private static final int START_AS_NON_DETACH_QUIET = 104;
 
   /** Temporary context object, to provide instance methods instead of static methods. */
   private final DirectoryServerContext serverContext;
@@ -665,7 +665,7 @@
   private int lookthroughLimit;
 
   /** The current active persistent searches. */
-  private AtomicInteger activePSearches = new AtomicInteger(0);
+  private final AtomicInteger activePSearches = new AtomicInteger(0);
 
   /** The maximum number of concurrent persistent searches. */
   private int maxPSearches;
@@ -1173,8 +1173,7 @@
    * @throws  InitializationException  If a problem occurs while attempting to
    *                                   bootstrap the server.
    */
-  public void bootstrapServer()
-         throws InitializationException
+  private void bootstrapServer() throws InitializationException
   {
     // First, make sure that the server isn't currently running.  If it isn't,
     // then make sure that no other thread will try to start or bootstrap the
@@ -2134,7 +2133,7 @@
    *                                   the backends that is not related to the
    *                                   server configuration.
    */
-  public void initializeBackends() throws ConfigException, InitializationException
+  private void initializeBackends() throws ConfigException, InitializationException
   {
     backendConfigManager = new BackendConfigManager(serverContext);
     backendConfigManager.initializeBackendConfig();
@@ -2231,9 +2230,8 @@
    *                              workflow conflicts with the workflow
    *                              ID of an existing workflow.
    */
-  public static void createAndRegisterWorkflowsWithDefaultNetworkGroup(
-      Backend backend
-      ) throws DirectoryException
+  private static void createAndRegisterWorkflowsWithDefaultNetworkGroup(
+      Backend backend) throws DirectoryException
   {
     // Create a workflow for each backend base DN and register the workflow
     // with the default/internal/admin network group.
@@ -2264,10 +2262,8 @@
    *                              workflow conflicts with the workflow
    *                              ID of an existing workflow.
    */
-  public static WorkflowImpl createWorkflow(
-      DN      baseDN,
-      Backend backend
-      ) throws DirectoryException
+  private static WorkflowImpl createWorkflow(DN baseDN, Backend backend)
+      throws DirectoryException
   {
     String backendID = backend.getBackendID();
 
@@ -2458,7 +2454,7 @@
    *                                   attempting to initialize and start the
    *                                   Directory Server.
    */
-  public void configureWorkflowsManual()
+  private void configureWorkflowsManual()
       throws ConfigException, InitializationException
   {
     // First of all re-initialize the current workflow configuration
@@ -2545,7 +2541,7 @@
    *                                   the group manager that is not related to
    *                                   the server configuration.
    */
-  public void initializeGroupManager()
+  private void initializeGroupManager()
          throws ConfigException, InitializationException
   {
     try
@@ -7550,16 +7546,9 @@
       directoryServer.shuttingDown = true;
     }
 
-    try {
-      directoryServer.configHandler.getConfigRootEntry();
-    } catch (Exception e) {
-
-    }
-
     // Send an alert notification that the server is shutting down.
-    LocalizableMessage message = NOTE_SERVER_SHUTDOWN.get(className, reason);
     sendAlertNotification(directoryServer, ALERT_TYPE_SERVER_SHUTDOWN,
-            message);
+        NOTE_SERVER_SHUTDOWN.get(className, reason));
 
 
     // Create a shutdown monitor that will watch the rest of the shutdown
@@ -7583,7 +7572,18 @@
     }
     directoryServer.connectionHandlers.clear();
 
+    if (directoryServer.workQueue != null)
+    {
+      directoryServer.workQueue.finalizeWorkQueue(reason);
+      directoryServer.workQueue.waitUntilIdle(ServerShutdownMonitor.WAIT_TIME);
+    }
 
+    // shutdown replication
+    for (SynchronizationProvider provider :
+         directoryServer.synchronizationProviders)
+    {
+      provider.finalizeSynchronizationProvider();
+    }
 
     // Call the shutdown plugins, and then finalize all the plugins defined in
     // the server.
@@ -7593,14 +7593,6 @@
       directoryServer.pluginConfigManager.finalizePlugins();
     }
 
-
-    // shutdown the Synchronization Providers
-    for (SynchronizationProvider provider :
-         directoryServer.synchronizationProviders)
-    {
-      provider.finalizeSynchronizationProvider();
-    }
-
     // Deregister the shutdown hook.
     if (directoryServer.shutdownHook != null)
     {
@@ -7612,13 +7604,6 @@
     }
 
 
-    // Stop the work queue.
-    if (directoryServer.workQueue != null)
-    {
-      directoryServer.workQueue.finalizeWorkQueue(reason);
-    }
-
-
     // Notify all the shutdown listeners.
     for (ServerShutdownListener shutdownListener :
          directoryServer.shutdownListeners)
@@ -7776,8 +7761,8 @@
         {
           logger.traceException(e2);
 
-          logger.warn(WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK, backend
-              .getBackendID(), stackTraceToSingleLineString(e2));
+          logger.warn(WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK,
+              backend.getBackendID(), stackTraceToSingleLineString(e2));
           // FIXME -- Do we need to send an admin alert?
         }
       }
@@ -7795,12 +7780,10 @@
     }
 
     // Release exclusive lock held on server.lock file
-    String serverLockFileName = LockFileManager.getServerLockFileName();
-    StringBuilder failureReason = new StringBuilder();
-
     try {
-        if (!LockFileManager.releaseLock(serverLockFileName,
-                failureReason)) {
+        String serverLockFileName = LockFileManager.getServerLockFileName();
+        StringBuilder failureReason = new StringBuilder();
+        if (!LockFileManager.releaseLock(serverLockFileName, failureReason)) {
             logger.info(NOTE_SERVER_SHUTDOWN, className, failureReason);
         }
     } catch (Exception e) {
@@ -9008,13 +8991,11 @@
    * @return Returns the class loader to be used with this directory
    *         server application.
    */
-  public static ClassLoader getClassLoader()
+  private static ClassLoader getClassLoader()
   {
     return ClassLoaderProvider.getInstance().getClassLoader();
   }
 
-
-
   /**
    * Loads the named class using this directory server application's
    * class loader.
@@ -9211,7 +9192,7 @@
    *
    * @return the workflow configuration mode
    */
-  public static boolean workflowConfigurationModeIsAuto()
+  private static boolean workflowConfigurationModeIsAuto()
   {
     return directoryServer.workflowConfigurationMode
         == WorkflowConfigurationMode.AUTO;
diff --git a/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java b/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java
index c7fe4f2..44b55d0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java
@@ -36,9 +36,18 @@
  * This class defines a daemon thread that will be used to monitor the server
  * shutdown process and may help nudge it along if it appears to get hung.
  */
-public class ServerShutdownMonitor extends DirectoryThread
+class ServerShutdownMonitor extends DirectoryThread
 {
   /**
+   * Time in milliseconds for the shutdown monitor to:
+   * <ol>
+   * <li>wait before sending interrupt to threads</li>
+   * <li>wait before final shutdown</li>
+   * </ol>
+   */
+  static final long WAIT_TIME = 30000;
+
+  /**
    * Indicates whether the monitor has completed and the shutdown may be
    * finalized with a call to {@link System#exit()}.
    */
@@ -110,7 +119,7 @@
 
       // For the first milestone, we'll run for up to 30 seconds just checking
       // to see whether all threads have stopped yet.
-      if (waitAllThreadsDied(30000))
+      if (waitAllThreadsDied(WAIT_TIME))
       {
         return;
       }
@@ -129,7 +138,7 @@
         } catch (Exception e) {}
       }
 
-      if (waitAllThreadsDied(30000))
+      if (waitAllThreadsDied(WAIT_TIME))
       {
         return;
       }
diff --git a/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java b/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
index 9508ff9..81cdcbd 100644
--- a/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
+++ b/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
@@ -26,7 +26,6 @@
  */
 package org.opends.server.extensions;
 
-
 import java.util.Map;
 
 import org.forgerock.i18n.LocalizableMessage;
@@ -40,7 +39,6 @@
 import static org.opends.messages.CoreMessages.*;
 import static org.opends.server.util.StaticUtils.*;
 
-
 /**
  * This class defines a data structure for storing and interacting with a
  * Directory Server worker thread.
@@ -57,7 +55,7 @@
   private volatile boolean shutdownRequested;
 
   /**
-   * Indicates whether this thread was stopped because the server threadnumber
+   * Indicates whether this thread was stopped because the server thread number
    * was reduced.
    */
   private boolean stoppedByReducedThreadNumber;
@@ -66,13 +64,13 @@
   private boolean waitingForWork;
 
   /** The operation that this worker thread is currently processing. */
-  private Operation operation;
+  private volatile Operation operation;
 
   /** The handle to the actual thread for this worker thread. */
   private Thread workerThread;
 
   /** The work queue that this worker thread will service. */
-  private TraditionalWorkQueue workQueue;
+  private final TraditionalWorkQueue workQueue;
 
 
 
@@ -123,7 +121,7 @@
    */
   public boolean isActive()
   {
-    return (isAlive() && (operation != null));
+    return isAlive() && operation != null;
   }
 
 
@@ -142,7 +140,7 @@
       try
       {
         waitingForWork = true;
-        operation = null;
+        operation = null; // this line is necessary because next line can block
         operation = workQueue.nextOperation(this);
         waitingForWork = false;
 
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java b/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java
index 205f0dc..33ea83d 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.common;
 
@@ -44,7 +44,7 @@
    * @see #lastTime
    */
   private int seqnum;
-  private int serverId;
+  private final int serverId;
 
   /**
    * Create a new {@link CSNGenerator}.
@@ -64,12 +64,12 @@
   /**
   * Create a new {@link CSNGenerator}.
   *
-  * @param id id to use when creating {@link CSN}s.
+  * @param serverId serverId to use when creating {@link CSN}s.
   * @param state This generator will be created in a way that makes sure that
   *              all {@link CSN}s generated will be larger than all the
   *              {@link CSN}s currently in state.
   */
-  public CSNGenerator(int id, ServerState state)
+  public CSNGenerator(int serverId, ServerState state)
   {
     this.lastTime = TimeThread.getTime();
     for (CSN csn : state)
@@ -78,12 +78,12 @@
       {
         this.lastTime = csn.getTime();
       }
-      if (csn.getServerId() == id)
+      if (csn.getServerId() == serverId)
       {
         this.seqnum = csn.getSeqnum();
       }
     }
-    this.serverId = id;
+    this.serverId = serverId;
   }
 
   /**
@@ -104,7 +104,7 @@
         lastTime = curTime;
       }
 
-      if (++seqnum <= 0)
+      if (++seqnum <= 0) // check no underflow happened
       {
         seqnum = 0;
         lastTime++;
@@ -155,7 +155,7 @@
         lastTime = ++rcvdTime;
       }
 
-      if ((serverId == changeServerId) && (seqnum < changeSeqNum))
+      if (serverId == changeServerId && seqnum < changeSeqNum)
       {
         seqnum = changeSeqNum;
       }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index dfd3cd6..4b15777 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -104,6 +104,9 @@
  *  processing a change received from the replicationServer service,
  *  handle conflict resolution,
  *  handle protocol messages from the replicationServer.
+ * <p>
+ * FIXME Move this class to org.opends.server.replication.service
+ * or the equivalent package once this code is moved to a maven module.
  */
 public final class LDAPReplicationDomain extends ReplicationDomain
        implements ConfigurationChangeListener<ReplicationDomainCfg>,
@@ -4067,9 +4070,7 @@
     // Now for bad data set status if needed
     if (forceBadDataSet)
     {
-      // Go into bad data set status
-      setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
-      broker.signalStatusChange(status);
+      signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
       logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDNString());
       return; // Do not send changes to the replication server
     }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
index b9f8203..e694ab4 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -22,28 +22,26 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions copyright 2013 ForgeRock AS
+ *      Portions copyright 2014 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.LDAPUpdateMsg;
-import org.opends.server.types.DN;
 import org.opends.server.types.operation.PluginOperation;
 
 /**
  * This class is use to store an operation currently
  * in progress and not yet committed in the database.
  */
-public class PendingChange implements Comparable<PendingChange>
+class PendingChange implements Comparable<PendingChange>
 {
-  private CSN csn;
+  private final CSN csn;
   private boolean committed;
   private LDAPUpdateMsg msg;
-  private PluginOperation op;
+  private final PluginOperation op;
   private ServerState dependencyState;
-  private DN targetDN;
 
   /**
    * Construct a new PendingChange.
@@ -51,7 +49,7 @@
    * @param op the operation to use
    * @param msg the message to use (can be null for local operations)
    */
-  public PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
+  PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
   {
     this.csn = csn;
     this.committed = false;
@@ -115,15 +113,6 @@
   }
 
   /**
-   * Set the operation associated to this PendingChange.
-   * @param op The operation associated to this PendingChange.
-   */
-  public void setOp(PluginOperation op)
-  {
-    this.op = op;
-  }
-
-  /**
    * Add the given CSN to the list of dependencies of this PendingChange.
    *
    * @param csn
@@ -152,29 +141,24 @@
     return state.cover(dependencyState);
   }
 
-  /**
-   * Get the Target DN of this message.
-   *
-   * @return The target DN of this message.
-   */
-  public DN getTargetDN()
-  {
-    synchronized (this)
-    {
-      if (targetDN == null)
-      {
-        targetDN = msg.getDN();
-      }
-      return targetDN;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public int compareTo(PendingChange o)
   {
-    return getCSN().compareTo(o.getCSN());
+    return csn.compareTo(o.csn);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName()
+        + " committed=" + committed
+        + ", csn=" + csn.toStringUI()
+        + ", msg=[" + msg
+        + "], isOperationSynchronized="
+        + (op != null ? op.isSynchronizationOperation() : "false")
+        + ", dependencyState="
+        + (dependencyState != null ? dependencyState : "");
   }
 }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
index b66f7b6..42ee696 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,8 +26,8 @@
  */
 package org.opends.server.replication.plugin;
 
+import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.opends.server.replication.common.CSN;
@@ -51,19 +51,19 @@
   /**
    * A map used to store the pending changes.
    */
-  private SortedMap<CSN, PendingChange> pendingChanges =
-    new TreeMap<CSN, PendingChange>();
+  private final TreeMap<CSN, PendingChange> pendingChanges =
+      new TreeMap<CSN, PendingChange>();
 
   /**
    * The {@link CSNGenerator} to use to create new unique CSNs
    * for each operation done on the replication domain.
    */
-  private CSNGenerator csnGenerator;
+  private final CSNGenerator csnGenerator;
 
   /**
    * The ReplicationDomain that will be used to send UpdateMsg.
    */
-  private ReplicationDomain domain;
+  private final ReplicationDomain domain;
 
   private boolean recoveringOldChanges = false;
 
@@ -128,34 +128,32 @@
   synchronized CSN putLocalOperation(PluginOperation operation)
   {
     final CSN csn = csnGenerator.newCSN();
-    final PendingChange change = new PendingChange(csn, operation, null);
-    pendingChanges.put(csn, change);
+    if (!operation.isSynchronizationOperation())
+    {
+      pendingChanges.put(csn, new PendingChange(csn, operation, null));
+    }
     return csn;
   }
 
   /**
    * Push all committed local changes to the replicationServer service.
-   *
-   * @return The number of pushed updates.
    */
-  synchronized int pushCommittedChanges()
+  synchronized void pushCommittedChanges()
   {
-    int numSentUpdates = 0;
-    if (pendingChanges.isEmpty())
+    // peek the oldest change
+    Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry();
+    if (firstEntry == null)
     {
-      return numSentUpdates;
+      return;
     }
 
-    // peek the oldest CSN
-    CSN firstCSN = pendingChanges.firstKey();
-    PendingChange firstChange = pendingChanges.get(firstCSN);
+    PendingChange firstChange = firstEntry.getValue();
 
     while (firstChange != null && firstChange.isCommitted())
     {
       final PluginOperation op = firstChange.getOp();
       if (op != null && !op.isSynchronizationOperation())
       {
-        numSentUpdates++;
         final LDAPUpdateMsg updateMsg = firstChange.getMsg();
         if (!recoveringOldChanges)
         {
@@ -168,20 +166,14 @@
           domain.getServerState().update(updateMsg.getCSN());
         }
       }
-      pendingChanges.remove(firstCSN);
 
-      if (pendingChanges.isEmpty())
-      {
-        firstChange = null;
-      }
-      else
-      {
-        // peek the oldest CSN
-        firstCSN = pendingChanges.firstKey();
-        firstChange = pendingChanges.get(firstCSN);
-      }
+      // false warning: firstEntry will not be null if firstChange is not null
+      pendingChanges.remove(firstEntry.getKey());
+
+      // peek the oldest change
+      firstEntry = pendingChanges.firstEntry();
+      firstChange = firstEntry != null ? firstEntry.getValue() : null;
     }
-    return numSentUpdates;
   }
 
   /**
@@ -189,16 +181,13 @@
    * push all committed local changes to the replicationServer service
    * in a single atomic operation.
    *
-   *
    * @param csn The CSN of the update message that must be set as committed.
-   * @param msg          The message associated to the update.
-   *
-   * @return The number of pushed updates.
+   * @param msg The message associated to the update.
    */
-  synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
+  synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
   {
     commit(csn, msg);
-    return pushCommittedChanges();
+    pushCommittedChanges();
   }
 
   /**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index bb7dd46..5126be4 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -48,7 +48,7 @@
  *
  * One of this object is instantiated for each ReplicationDomain.
  */
-public final class RemotePendingChanges
+final class RemotePendingChanges
 {
   /**
    * A map used to store the pending changes.
@@ -124,7 +124,7 @@
     CSN firstCSN = pendingChanges.firstKey();
     PendingChange firstChange = pendingChanges.get(firstCSN);
 
-    while ((firstChange != null) && firstChange.isCommitted())
+    while (firstChange != null && firstChange.isCommitted())
     {
       state.update(firstCSN);
       pendingChanges.remove(firstCSN);
@@ -196,17 +196,19 @@
   public synchronized boolean checkDependencies(AddOperation op)
   {
     boolean hasDependencies = false;
-    DN targetDn = op.getEntryDN();
-    CSN csn = OperationContext.getCSN(op);
-    PendingChange change = pendingChanges.get(csn);
+    final DN targetDN = op.getEntryDN();
+    final CSN csn = OperationContext.getCSN(op);
+    final PendingChange change = pendingChanges.get(csn);
     if (change == null)
+    {
       return false;
+    }
 
     for (PendingChange pendingChange : pendingChanges.values())
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
         if (pendingMsg != null)
         {
           if (pendingMsg instanceof DeleteMsg)
@@ -215,7 +217,7 @@
              * Check is the operation to be run is a deleteOperation on the
              * same DN.
              */
-            if (pendingChange.getTargetDN().equals(targetDn))
+            if (pendingMsg.getDN().equals(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -227,7 +229,7 @@
              * Check if the operation to be run is an addOperation on a
              * parent of the current AddOperation.
              */
-            if (pendingChange.getTargetDN().isAncestorOf(targetDn))
+            if (pendingMsg.getDN().isAncestorOf(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -240,15 +242,15 @@
              * the same target DN as the ADD DN
              * or a ModifyDnOperation with new DN equals to the ADD DN parent
              */
-            if (pendingChange.getTargetDN().equals(targetDn))
+            if (pendingMsg.getDN().equals(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
             }
             else
             {
-              ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
-              if (pendingModDn.newDNIsParent(targetDn))
+              final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+              if (pendingModDn.newDNIsParent(targetDN))
               {
                 hasDependencies = true;
                 addDependency(change, pendingChange);
@@ -286,30 +288,26 @@
   public synchronized boolean checkDependencies(ModifyOperation op)
   {
     boolean hasDependencies = false;
-    DN targetDn = op.getEntryDN();
-    CSN csn = OperationContext.getCSN(op);
-    PendingChange change = pendingChanges.get(csn);
+    final DN targetDN = op.getEntryDN();
+    final CSN csn = OperationContext.getCSN(op);
+    final PendingChange change = pendingChanges.get(csn);
     if (change == null)
+    {
       return false;
+    }
 
     for (PendingChange pendingChange : pendingChanges.values())
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
-        if (pendingMsg != null)
+        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        if (pendingMsg instanceof AddMsg)
         {
-          if (pendingMsg instanceof AddMsg)
+          // Check if the operation to be run is an addOperation on a same DN.
+          if (pendingMsg.getDN().equals(targetDN))
           {
-            /*
-             * Check if the operation to be run is an addOperation on a
-             * same DN.
-             */
-            if (pendingChange.getTargetDN().equals(targetDn))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
+            hasDependencies = true;
+            addDependency(change, pendingChange);
           }
         }
       }
@@ -342,29 +340,30 @@
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(ModifyDNMsg msg)
+  private synchronized boolean checkDependencies(ModifyDNMsg msg)
   {
     boolean hasDependencies = false;
-    CSN csn = msg.getCSN();
-    PendingChange change = pendingChanges.get(csn);
+    final CSN csn = msg.getCSN();
+    final PendingChange change = pendingChanges.get(csn);
     if (change == null)
+    {
       return false;
+    }
 
-    DN targetDn = change.getTargetDN();
-
+    final DN targetDN = change.getMsg().getDN();
 
     for (PendingChange pendingChange : pendingChanges.values())
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
         if (pendingMsg != null)
         {
           if (pendingMsg instanceof DeleteMsg)
           {
             // Check if the target of the Delete is the same
             // as the new DN of this ModifyDN
-            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+            if (msg.newDNIsEqual(pendingMsg.getDN()))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -374,14 +373,14 @@
           {
             // Check if the Add Operation was done on the new parent of
             // the MODDN  operation
-            if (msg.newParentIsEqual(pendingChange.getTargetDN()))
+            if (msg.newParentIsEqual(pendingMsg.getDN()))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
             }
             // Check if the AddOperation was done on the same DN as the
             // target DN of the MODDN operation
-            if (pendingChange.getTargetDN().equals(targetDn))
+            if (pendingMsg.getDN().equals(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -391,7 +390,7 @@
           {
             // Check if the ModifyDNOperation was done from the new DN of
             // the MODDN operation
-            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+            if (msg.newDNIsEqual(pendingMsg.getDN()))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -431,17 +430,19 @@
   public synchronized boolean checkDependencies(DeleteOperation op)
   {
     boolean hasDependencies = false;
-    DN targetDn = op.getEntryDN();
-    CSN csn = OperationContext.getCSN(op);
-    PendingChange change = pendingChanges.get(csn);
+    final DN targetDN = op.getEntryDN();
+    final CSN csn = OperationContext.getCSN(op);
+    final PendingChange change = pendingChanges.get(csn);
     if (change == null)
+    {
       return false;
+    }
 
     for (PendingChange pendingChange : pendingChanges.values())
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
         if (pendingMsg != null)
         {
           if (pendingMsg instanceof DeleteMsg)
@@ -450,7 +451,7 @@
              * Check if the operation to be run is a deleteOperation on a
              * children of the current DeleteOperation.
              */
-            if (pendingChange.getTargetDN().isDescendantOf(targetDn))
+            if (pendingMsg.getDN().isDescendantOf(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -462,7 +463,7 @@
              * Check if the operation to be run is an addOperation on a
              * parent of the current DeleteOperation.
              */
-            if (pendingChange.getTargetDN().equals(targetDn))
+            if (pendingMsg.getDN().equals(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
@@ -470,13 +471,13 @@
           }
           else if (pendingMsg instanceof ModifyDNMsg)
           {
-            ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
+            final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
             /*
              * Check if the operation to be run is an ModifyDNOperation
              * on a children of the current DeleteOperation
              */
-            if ((pendingChange.getTargetDN().isDescendantOf(targetDn)) ||
-                (pendingModDn.newDNIsParent(targetDn)))
+            if (pendingMsg.getDN().isDescendantOf(targetDN)
+                || pendingModDn.newDNIsParent(targetDN))
             {
               hasDependencies = true;
               addDependency(change, pendingChange);
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9aefb0b..dc93aa1 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -86,12 +86,12 @@
    * topology. Using an AtomicReference to avoid leaking references to costly
    * threads.
    */
-  private AtomicReference<MonitoringPublisher> monitoringPublisher =
+  private final AtomicReference<MonitoringPublisher> monitoringPublisher =
       new AtomicReference<MonitoringPublisher>();
   /**
    * Maintains monitor data for the current domain.
    */
-  private ReplicationDomainMonitor domainMonitor =
+  private final ReplicationDomainMonitor domainMonitor =
       new ReplicationDomainMonitor(this);
 
   /**
@@ -116,7 +116,7 @@
 
   private final ReplicationDomainDB domainDB;
   /** The ReplicationServer that created the current instance. */
-  private ReplicationServer localReplicationServer;
+  private final ReplicationServer localReplicationServer;
 
   /**
    * The generationId of the current replication domain. The generationId is
@@ -158,7 +158,7 @@
    * The timer used to run the timeout code (timer tasks) for the assured update
    * messages we are waiting acks for.
    */
-  private Timer assuredTimeoutTimer;
+  private final Timer assuredTimeoutTimer;
   /**
    * Counter used to purge the timer tasks references in assuredTimeoutTimer,
    * every n number of treated assured messages.
@@ -186,8 +186,6 @@
     private boolean sendDSTopologyMsg;
     private int excludedDSForTopologyMsg = -1;
 
-
-
     /**
      * Enqueues a TopologyMsg for all the connected directory servers in order
      * to let them know the topology (every known DSs and RSs).
@@ -212,8 +210,6 @@
       }
     }
 
-
-
     /**
      * Enqueues a TopologyMsg for all the connected replication servers in order
      * to let them know our connected LDAP servers.
@@ -223,8 +219,6 @@
       sendRSTopologyMsg = true;
     }
 
-
-
     /**
      * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
      * all other RS instances.
@@ -237,19 +231,28 @@
       pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
     }
 
-
-
     private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
     {
       pendingDSMonitorMsgs.put(dsServerId, msg);
     }
 
-
-
     private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
     {
       pendingRSMonitorMsgs.put(rsServerId, msg);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return getClass().getSimpleName()
+          + " pendingHeartbeats=" + pendingHeartbeats
+          + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs
+          + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs
+          + ", sendRSTopologyMsg=" + sendRSTopologyMsg
+          + ", sendDSTopologyMsg=" + sendDSTopologyMsg
+          + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg;
+    }
   }
 
   private final Object pendingStatusMessagesLock = new Object();
@@ -2086,7 +2089,7 @@
   /**
    * Clears the Db associated with that domain.
    */
-  public void clearDbs()
+  private void clearDbs()
   {
     try
     {
@@ -2768,7 +2771,7 @@
       }
     }
 
-    if (pendingMsgs.sendRSTopologyMsg)
+    if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty())
     {
       final TopologyMsg topoMsg = createTopologyMsgForRS();
       for (ServerHandler handler : connectedRSs.values())
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c6b4149..5d05008 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2777,10 +2777,10 @@
 
     synchronized (startStopLock)
     {
+      stopChangeTimeHeartBeatPublishing();
+      stopRSHeartBeatMonitoring();
       shutdown = true;
       setConnectedRS(ConnectedRS.stopped());
-      stopRSHeartBeatMonitoring();
-      stopChangeTimeHeartBeatPublishing();
       deregisterReplicationMonitor();
     }
   }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index de653f3..ce9d714 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -210,7 +210,7 @@
   /**
    * Current status for this replicated domain.
    */
-  protected ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
+  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
   /** The configuration of the replication domain. */
@@ -2432,7 +2432,13 @@
    * event.
    * @param event The event that may make the status be changed
    */
-  protected void setNewStatus(StatusMachineEvent event)
+  protected void signalNewStatus(StatusMachineEvent event)
+  {
+    setNewStatus(event);
+    broker.signalStatusChange(status);
+  }
+
+  private void setNewStatus(StatusMachineEvent event)
   {
     ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
     if (newStatus == ServerStatus.INVALID_STATUS)
@@ -3400,7 +3406,7 @@
    * receive this {@link UpdateMsg} through a call of the
    * {@link #processUpdate(UpdateMsg)} message.
    *
-   * @param msg The UpdateMsg that should be pushed.
+   * @param msg The UpdateMsg that should be published.
    */
   public void publish(UpdateMsg msg)
   {
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index cae6348..86eda32 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -52,7 +52,7 @@
   }
 
   @Override
-  protected void setNewStatus(StatusMachineEvent event)
+  protected void signalNewStatus(StatusMachineEvent event)
   {
   }
 

--
Gitblit v1.10.0