From 856fdd0571358c660afaf379f8e774ab8b24f05c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock 

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java           |   57 ++--
 opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java                               |   87 +++----
 opends/src/server/org/opends/server/replication/service/ListenerThread.java                                      |   28 +-
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |  194 ++++++++-------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java       |   37 +-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java    |  116 +++-----
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java |   49 +--
 opends/src/server/org/opends/server/replication/plugin/ReplayThread.java                                         |   27 +-
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |   85 ++----
 9 files changed, 311 insertions(+), 369 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 4a7af7e..c86bc38 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@
  *      Copyright 2006-2010 Sun Microsystems, Inc.
  *      Portions Copyright 2011-2013 ForgeRock AS
  */
-
 package org.opends.server.replication.plugin;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -37,14 +36,12 @@
 import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.*;
 
-import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
@@ -63,14 +60,7 @@
 import org.opends.server.backends.jeb.BackendImpl;
 import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
-import org.opends.server.core.AddOperation;
-import org.opends.server.core.DeleteOperation;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.LockFileManager;
-import org.opends.server.core.ModifyDNOperation;
-import org.opends.server.core.ModifyDNOperationBasis;
-import org.opends.server.core.ModifyOperation;
-import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.core.*;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.protocols.asn1.ASN1Exception;
 import org.opends.server.protocols.internal.InternalClientConnection;
@@ -80,12 +70,7 @@
 import org.opends.server.protocols.ldap.LDAPControl;
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.protocols.ldap.LDAPModification;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
@@ -93,16 +78,7 @@
 import org.opends.server.tasks.PurgeConflictsHistoricalTask;
 import org.opends.server.tasks.TaskUtils;
 import org.opends.server.types.*;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.types.operation.*;
 import org.opends.server.util.LDIFReader;
 import org.opends.server.util.TimeThread;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -191,8 +167,10 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  // The update to replay message queue where the listener thread is going to
-  // push incoming update messages.
+  /**
+   * The update to replay message queue where the listener thread is going to
+   * push incoming update messages.
+   */
   private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
   private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
   private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
@@ -238,9 +216,11 @@
   private volatile boolean disabled = false;
   private volatile boolean stateSavingDisabled = false;
 
-  // This list is used to temporary store operations that needs
-  // to be replayed at session establishment time.
-  private final TreeMap<ChangeNumber, FakeOperation> replayOperations  =
+  /**
+   * This list is used to temporary store operations that needs to be replayed
+   * at session establishment time.
+   */
+  private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
     new TreeMap<ChangeNumber, FakeOperation>();
 
   /**
@@ -288,7 +268,7 @@
    * Fractional replication variables.
    */
 
-  // Holds the fractional configuration for this domain, if any.
+  /** Holds the fractional configuration for this domain, if any. */
   private FractionalConfig fractionalConfig = null;
 
   /**
@@ -341,29 +321,39 @@
    * fractionalFilterOperation(PreOperationModifyOperation
    *  modifyOperation, boolean performFiltering) method
    */
-  // The operation contains attributes subject to fractional filtering according
-  // to the fractional configuration
+  /**
+   * The operation contains attributes subject to fractional filtering according
+   * to the fractional configuration.
+   */
   private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
-  // The operation contains no attributes subject to fractional filtering
-  // according to the fractional configuration
+  /**
+   * The operation contains no attributes subject to fractional filtering
+   * according to the fractional configuration.
+   */
   private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
-  // The operation should become a no-op
+  /** The operation should become a no-op. */
   private static final int FRACTIONAL_BECOME_NO_OP = 3;
 
-  // This configuration boolean indicates if this ReplicationDomain should log
-  // ChangeNumbers.
+  /**
+   * This configuration boolean indicates if this ReplicationDomain should log
+   * ChangeNumbers.
+   */
   private boolean logChangeNumber = false;
 
-  // This configuration integer indicates the time the domain keeps the
-  // historical information necessary to solve conflicts.
-  // When a change stored in the historical part of the user entry has a date
-  // (from its replication ChangeNumber) older than this delay, it is candidate
-  // to be purged.
+  /**
+   * This configuration integer indicates the time the domain keeps the
+   * historical information necessary to solve conflicts.<br>
+   * When a change stored in the historical part of the user entry has a date
+   * (from its replication ChangeNumber) older than this delay, it is candidate
+   * to be purged.
+   */
   private long histPurgeDelayInMilliSec = 0;
 
-  // The last change number purged in this domain. Allows to have a continuous
-  // purging process from one purge processing (task run) to the next one.
-  // Values 0 when the server starts.
+  /**
+   * The last change number purged in this domain. Allows to have a continuous
+   * purging process from one purge processing (task run) to the next one.
+   * Values 0 when the server starts.
+   */
   private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
 
   /**
@@ -752,10 +742,8 @@
     if (fractionalConfig.isFractional())
     {
       // Set new fractional configuration values
-      if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
-        fractionalConfig.setFractionalExclusive(true);
-      else
-        fractionalConfig.setFractionalExclusive(false);
+      fractionalConfig.setFractionalExclusive(
+          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
       fractionalConfig.setFractionalSpecificClassesAttributes(
         newFractionalConfig.getFractionalSpecificClassesAttributes());
       fractionalConfig.setFractionalAllClassesAttributes(
@@ -950,10 +938,8 @@
     // Set stored fractional configuration values
     if (storedFractionalConfig.isFractional())
     {
-      if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
-        storedFractionalConfig.setFractionalExclusive(true);
-      else
-        storedFractionalConfig.setFractionalExclusive(false);
+      storedFractionalConfig.setFractionalExclusive(
+          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
     }
     storedFractionalConfig.setFractionalSpecificClassesAttributes(
       storedFractionalSpecificClassesAttributes);
@@ -1434,12 +1420,10 @@
             AttributeValue rdnAttributeValue =
               entryRdn.getAttributeValue(attributeType);
             List<Attribute> attrList = attributesMap.get(attributeType);
-            Iterator<Attribute> attrIt = attrList.iterator();
             AttributeValue sameAttrValue = null;
             //    Locate the attribute value identical to the one in the RDN
-            while(attrIt.hasNext())
+            for (Attribute attr : attrList)
             {
-              Attribute attr = attrIt.next();
               if (attr.contains(rdnAttributeValue))
               {
                 for (AttributeValue attrValue : attr) {
@@ -2577,9 +2561,12 @@
   /**
    * Create and replay a synchronized Operation from an UpdateMsg.
    *
-   * @param msg The UpdateMsg to be replayed.
+   * @param msg
+   *          The UpdateMsg to be replayed.
+   * @param shutdown
+   *          whether the server initiated shutdown
    */
-  public void replay(LDAPUpdateMsg msg)
+  public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
   {
     Operation op = null;
     boolean replayDone = false;
@@ -2599,6 +2586,11 @@
 
         while ((!dependency) && (!replayDone) && (retryCount-- > 0))
         {
+          if (shutdown.get())
+          {
+            // shutdown initiated, let's leave
+            return;
+          }
           // Try replay the operation
           op.setInternalOperation(true);
           op.setSynchronizationOperation(true);
@@ -2622,6 +2614,25 @@
               // renamed by a more recent modify DN.
               replayDone = true;
             }
+            else if (result == ResultCode.BUSY)
+            {
+              /*
+               * We probably could not get a lock (OPENDJ-885). Give the server
+               * another chance to process this operation immediately.
+               */
+              Thread.yield();
+              continue;
+            }
+            else if (result == ResultCode.UNAVAILABLE)
+            {
+              /*
+               * It can happen when a rebuild is performed or the backend is
+               * offline (OPENDJ-49). Give the server another chance to process
+               * this operation after some time.
+               */
+              Thread.sleep(50);
+              continue;
+            }
             else if (op instanceof ModifyOperation)
             {
               ModifyOperation newOp = (ModifyOperation) op;
@@ -2693,22 +2704,13 @@
         }
       } catch (ASN1Exception e)
       {
-        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-          String.valueOf(msg) + stackTraceToSingleLineString(e));
-        logError(message);
-        replayErrorMsg = message.toString();
+        replayErrorMsg = logDecodingOperationError(msg, e);
       } catch (LDAPException e)
       {
-        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-          String.valueOf(msg) + stackTraceToSingleLineString(e));
-        logError(message);
-        replayErrorMsg = message.toString();
+        replayErrorMsg = logDecodingOperationError(msg, e);
       } catch (DataFormatException e)
       {
-        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-          String.valueOf(msg) + stackTraceToSingleLineString(e));
-        logError(message);
-        replayErrorMsg = message.toString();
+        replayErrorMsg = logDecodingOperationError(msg, e);
       } catch (Exception e)
       {
         if (changeNumber != null)
@@ -2726,10 +2728,7 @@
           updateError(changeNumber);
         } else
         {
-          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-            String.valueOf(msg) + stackTraceToSingleLineString(e));
-          logError(message);
-          replayErrorMsg = message.toString();
+          replayErrorMsg = logDecodingOperationError(msg, e);
         }
       } finally
       {
@@ -2753,6 +2752,14 @@
     } while (msg != null);
   }
 
+  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
+  {
+    Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+      String.valueOf(msg) + stackTraceToSingleLineString(e));
+    logError(message);
+    return message.toString();
+  }
+
   /**
    * This method is called when an error happens while replaying
    * an operation.
@@ -4862,7 +4869,7 @@
    * {@inheritDoc}
    */
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg)
+  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
   {
     // Ignore message if fractional configuration is inconsistent and
     // we have been passed into bad data set status
@@ -4880,8 +4887,23 @@
 
       // Put update message into the replay queue
       // (block until some place in the queue is available)
-      UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
-      updateToReplayQueue.offer(updateToReplay);
+      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
+      while (!shutdown.get())
+      {
+        // loop until we can offer to the queue or shutdown was initiated
+        try
+        {
+          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
+          {
+            // successful offer to the queue, let's exit the loop
+            break;
+          }
+        }
+        catch (InterruptedException e)
+        {
+          // Thread interrupted: check for shutdown.
+        }
+      }
 
       return false;
     }
@@ -5321,10 +5343,8 @@
         case EXCLUSIVE_FRACTIONAL:
         case INCLUSIVE_FRACTIONAL:
           result.setFractional(true);
-          if (newFractionalMode == EXCLUSIVE_FRACTIONAL)
-            result.setFractionalExclusive(true);
-          else
-            result.setFractionalExclusive(false);
+          result.setFractionalExclusive(
+              newFractionalMode == EXCLUSIVE_FRACTIONAL);
           break;
       }
       result.setFractionalSpecificClassesAttributes(
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index c92c682..aab9397 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@
  */
 package org.opends.server.replication.plugin;
 
-import java.util.ArrayList;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
+import static org.opends.server.util.StaticUtils.*;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationAddListener;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.server.ConfigurationDeleteListener;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.BackupTaskListener;
-import org.opends.server.api.ExportTaskListener;
-import org.opends.server.api.ImportTaskListener;
-import org.opends.server.api.RestoreTaskListener;
-import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.api.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.types.BackupConfig;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.Control;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.Modification;
-import org.opends.server.types.Operation;
-import org.opends.server.types.RestoreConfig;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SynchronizationProviderResult;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
+import org.opends.server.types.*;
+import org.opends.server.types.operation.*;
 
 /**
  * This class is used to load the Replication code inside the JVM
@@ -104,8 +76,8 @@
    * The queue of received update messages, to be treated by the ReplayThread
    * threads.
    */
-  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
-    new LinkedBlockingQueue<UpdateToReplay>();
+  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue =
+      new LinkedBlockingQueue<UpdateToReplay>(10000);
 
   /**
    * The list of ReplayThread threads.
@@ -228,9 +200,9 @@
   }
 
   /**
-   * Creates a new domain from its configEntry, do the
-   * necessary initialization and starts it so that it is
-   * fully operational when this method returns.
+   * Creates a new domain from its configEntry, do the necessary initialization
+   * and starts it so that it is fully operational when this method returns. It
+   * is only used for tests so far.
    *
    * @param configuration The entry with the configuration of this domain.
    * @param queue         The BlockingQueue that this domain will use.
@@ -239,13 +211,13 @@
    *
    * @throws ConfigException When the configuration is not valid.
    */
-  public static LDAPReplicationDomain createNewDomain(
+  static LDAPReplicationDomain createNewDomain(
       ReplicationDomainCfg configuration,
       BlockingQueue<UpdateToReplay> queue)
       throws ConfigException
   {
-    LDAPReplicationDomain domain;
-    domain = new LDAPReplicationDomain(configuration, queue);
+    LDAPReplicationDomain domain =
+        new LDAPReplicationDomain(configuration, queue);
 
     domains.put(domain.getBaseDN(), domain);
     return domain;
@@ -362,6 +334,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public boolean isConfigurationAddAcceptable(
       ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
   {
@@ -372,6 +345,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public ConfigChangeResult applyConfigurationAdd(
      ReplicationDomainCfg configuration)
   {
@@ -652,6 +626,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processBackupBegin(Backend backend, BackupConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -665,6 +640,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processBackupEnd(Backend backend, BackupConfig config,
                                boolean successful)
   {
@@ -679,6 +655,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processRestoreBegin(Backend backend, RestoreConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -692,6 +669,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processRestoreEnd(Backend backend, RestoreConfig config,
                                 boolean successful)
   {
@@ -706,6 +684,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processImportBegin(Backend backend, LDIFImportConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -719,6 +698,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processImportEnd(Backend backend, LDIFImportConfig config,
                                boolean successful)
   {
@@ -733,6 +713,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processExportBegin(Backend backend, LDIFExportConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -746,6 +727,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processExportEnd(Backend backend, LDIFExportConfig config,
                                boolean successful)
   {
@@ -760,6 +742,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public ConfigChangeResult applyConfigurationDelete(
       ReplicationDomainCfg configuration)
   {
@@ -771,6 +754,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public boolean isConfigurationDeleteAcceptable(
       ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
   {
@@ -804,10 +788,10 @@
   /**
    * {@inheritDoc}
    */
-  public boolean
-    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
-    configuration,
-    List<Message> unacceptableReasons)
+  @Override
+  public boolean isConfigurationChangeAcceptable(
+      ReplicationSynchronizationProviderCfg configuration,
+      List<Message> unacceptableReasons)
   {
     return true;
   }
@@ -815,9 +799,9 @@
   /**
    * {@inheritDoc}
    */
-  public ConfigChangeResult
-    applyConfigurationChange
-    (ReplicationSynchronizationProviderCfg configuration)
+  @Override
+  public ConfigChangeResult applyConfigurationChange(
+      ReplicationSynchronizationProviderCfg configuration)
   {
     int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
 
@@ -838,6 +822,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void completeSynchronizationProvider()
   {
     isRegistered = true;
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 1304346..a57c110 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -26,20 +26,20 @@
  *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
-import org.opends.server.replication.protocol.LDAPUpdateMsg;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.opends.messages.Message;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
 
 /**
  * Thread that is used to get message from the replication servers (stored
@@ -56,7 +56,7 @@
   private static final DebugTracer TRACER = getTracer();
 
   private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
-  private volatile boolean shutdown = false;
+  private AtomicBoolean shutdown = new AtomicBoolean(false);
   private static int count = 0;
 
   /**
@@ -75,7 +75,7 @@
    */
   public void shutdown()
   {
-    shutdown = true;
+    shutdown.set(true);
   }
 
   /**
@@ -84,27 +84,26 @@
   @Override
   public void run()
   {
-
     if (debugEnabled())
     {
       TRACER.debugInfo("Replication Replay thread starting.");
     }
 
-    while (!shutdown)
+    while (!shutdown.get())
     {
       try
       {
         UpdateToReplay updateToreplay;
         // Loop getting an updateToReplayQueue from the update message queue and
         // replaying matching changes
-        while ( (!shutdown) &&
+        while (!shutdown.get() &&
           ((updateToreplay = updateToReplayQueue.poll(1L,
           TimeUnit.SECONDS)) != null))
         {
           // Find replication domain for that update message
           LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
           LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
-          domain.replay(updateMsg);
+          domain.replay(updateMsg, shutdown);
         }
       } catch (Exception e)
       {
diff --git a/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
index 5d94ac5..2dbae7f 100644
--- a/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -26,14 +26,15 @@
  *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.service;
-import org.opends.messages.Message;
 
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.messages.Message;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -50,7 +51,7 @@
   private static final DebugTracer TRACER = getTracer();
 
   private final ReplicationDomain repDomain;
-  private volatile boolean shutdown = false;
+  private AtomicBoolean shutdown = new AtomicBoolean(false);
   private volatile boolean done = false;
 
 
@@ -72,7 +73,7 @@
    */
   public void shutdown()
   {
-    shutdown = true;
+    shutdown.set(true);
   }
 
   /**
@@ -81,22 +82,21 @@
   @Override
   public void run()
   {
-    UpdateMsg updateMsg = null;
-
     if (debugEnabled())
     {
       TRACER.debugInfo("Replication Listener thread starting.");
     }
 
-    while (!shutdown)
+    while (!shutdown.get())
     {
+      UpdateMsg updateMsg = null;
       try
       {
-        // Loop receiving update messages and puting them in the update message
+        // Loop receiving update messages and putting them in the update message
         // queue
-        while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
+        while (!shutdown.get() && ((updateMsg = repDomain.receive()) != null))
         {
-          if (repDomain.processUpdate(updateMsg))
+          if (repDomain.processUpdate(updateMsg, shutdown))
           {
             repDomain.processUpdateDoneSynchronous(updateMsg);
           }
@@ -104,7 +104,7 @@
 
         if (updateMsg == null)
         {
-          shutdown = true;
+          shutdown.set(true);
         }
       }
       catch (Exception e)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 5bcc989..4505ed6 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -28,9 +28,8 @@
 package org.opends.server.replication.service;
 
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.replication.common.StatusMachine.*;
 
 import java.io.BufferedOutputStream;
@@ -41,6 +40,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opends.messages.Category;
@@ -50,32 +50,8 @@
 import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.HeartbeatMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.Session;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
 import org.opends.server.tasks.InitializeTargetTask;
 import org.opends.server.tasks.InitializeTask;
 import org.opends.server.types.Attribute;
@@ -3184,41 +3160,40 @@
   public abstract long countEntries() throws DirectoryException;
 
   /**
-   * This method should handle the processing of {@link UpdateMsg} receive
-   * from remote replication entities.
+   * This method should handle the processing of {@link UpdateMsg} receive from
+   * remote replication entities.
    * <p>
-   * This method will be called by a single thread and should therefore
-   * should not be blocking.
+   * This method will be called by a single thread and should therefore should
+   * not be blocking.
    *
-   * @param updateMsg The {@link UpdateMsg} that was received.
-   *
-   * @return A boolean indicating if the processing is completed at return
-   *                   time.
-   *                   If <code> true </code> is returned, no further
-   *                   processing is necessary.
-   *
-   *                   If <code> false </code> is returned, the subclass should
-   *                   call the method
-   *                   {@link #processUpdateDone(UpdateMsg, String)}
-   *                   and update the ServerState
-   *                   When this processing is complete.
-   *
+   * @param updateMsg
+   *          The {@link UpdateMsg} that was received.
+   * @param shutdown
+   *          whether the server initiated shutdown
+   * @return A boolean indicating if the processing is completed at return time.
+   *         If <code> true </code> is returned, no further processing is
+   *         necessary. If <code> false </code> is returned, the subclass should
+   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
+   *         update the ServerState When this processing is complete.
    */
-  public abstract boolean processUpdate(UpdateMsg updateMsg);
+  public abstract boolean processUpdate(UpdateMsg updateMsg,
+      AtomicBoolean shutdown);
 
   /**
    * This method must be called after each call to
-   * {@link #processUpdate(UpdateMsg)} when the processing of the update is
-   * completed.
+   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+   * update is completed.
    * <p>
    * It is useful for implementation needing to process the update in an
-   * asynchronous way or using several threads, but must be called even
-   * by implementation doing it in a synchronous, single-threaded way.
+   * asynchronous way or using several threads, but must be called even by
+   * implementation doing it in a synchronous, single-threaded way.
    *
-   * @param  msg The UpdateMsg whose processing was completed.
-   * @param replayErrorMsg if not null, this means an error occurred during the
-   * replay of this update, and this is the matching human readable message
-   * describing the problem.
+   * @param msg
+   *          The UpdateMsg whose processing was completed.
+   * @param replayErrorMsg
+   *          if not null, this means an error occurred during the replay of
+   *          this update, and this is the matching human readable message
+   *          describing the problem.
    */
   public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
   {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 6af1251..210e3f5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -23,24 +23,20 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
-import org.opends.server.util.StaticUtils;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
@@ -60,84 +56,80 @@
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.service.ReplicationDomain;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
-import org.opends.server.types.ObjectClass;
-import org.opends.server.types.ResultCode;
+import org.opends.server.types.*;
+import org.opends.server.util.StaticUtils;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import static org.testng.Assert.*;
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
 
 /**
  * Various tests around fractional replication
  */
+@SuppressWarnings("javadoc")
 public class FractionalReplicationTest extends ReplicationTestCase {
 
-  // The RS
+  /** The RS */
   private ReplicationServer replicationServer = null;
-  // RS port
+  /** RS port */
   private int replServerPort = -1;
-  // Represents the real domain to test (replays and filters)
+  /** Represents the real domain to test (replays and filters) */
   private Entry fractionalDomainCfgEntry = null;
-  // The domain used to send updates to the reald domain
+  /** The domain used to send updates to the reald domain */
   private FakeReplicationDomain replicationDomain = null;
 
-  // Ids of servers
+  /** Ids of servers */
   private static final int DS1_ID = 1; // fractional domain
   private static final int DS2_ID = 2; // fake domain
   private static final int RS_ID = 91; // replication server
 
   private final String testName = this.getClass().getSimpleName();
 
-  // Fractional mode
+  /** Fractional mode */
   private static final int EXCLUDE_FRAC_MODE = 0;
   private static final int INCLUDE_FRAC_MODE = 1;
 
   int initWindow = 100;
   private ChangeNumberGenerator gen = null;
 
-  // The tracer object for the debug logger
+  /** The tracer object for the debug logger */
   private static final DebugTracer TRACER = getTracer();
 
-  // Number of seconds before generating an error if some conditions not met
+  /** Number of seconds before generating an error if some conditions not met */
   private static final int TIMEOUT = 10000;
 
-  // Uuid of the manipulated entry
+  /** Uuid of the manipulated entry */
   private static final String ENTRY_UUID =
     "11111111-1111-1111-1111-111111111111";
   private static final String ENTRY_UUID2 =
     "22222222-2222-2222-2222-222222222222";
   private static final String ENTRY_UUID3 =
     "33333333-3333-3333-3333-333333333333";
-  // Dn of the manipulated entry
+  /** Dn of the manipulated entry */
   private static String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING;
 
-  // Optional attribute not part of concerned attributes of the fractional
-  // configuration during tests. It should not be impacted by fractional
-  // mechanism
+  /**
+   * Optional attribute not part of concerned attributes of the fractional
+   * configuration during tests. It should not be impacted by fractional
+   * mechanism
+   */
   private static final String OPTIONAL_ATTR = "description";
 
-  // Optional attribute used as synchronization attribute to know when the modify
-  // operation has been processed (used as add new attribute in the modify operation)
-  // It may or may not be part of the filtered attributes, depending on the fractional
-  // test mode : exclusive or inclusive
+  /**
+   * Optional attribute used as synchronization attribute to know when the
+   * modify operation has been processed (used as add new attribute in the
+   * modify operation) It may or may not be part of the filtered attributes,
+   * depending on the fractional test mode : exclusive or inclusive
+   */
   private static final String SYNCHRO_OPTIONAL_ATTR = "seeAlso";
 
-  // Second test backend
+  /** Second test backend */
   private static final String TEST2_ROOT_DN_STRING = "dc=example,dc=com";
   private static final String TEST2_ORG_DN_STRING = "o=test2," + TEST2_ROOT_DN_STRING;
   private static String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING;
@@ -456,7 +448,6 @@
   private void createFakeReplicationDomain(boolean firstBackend, long generationId)
   {
     try{
-
       List<String> replicationServers = new ArrayList<String>();
       replicationServers.add("localhost:" + replServerPort);
 
@@ -474,7 +465,7 @@
       String rdPortStr = serverStr.substring(index + 1);
       try
       {
-        rdPort = (new Integer(rdPortStr)).intValue();
+        rdPort = Integer.valueOf(rdPortStr);
       } catch (Exception e)
       {
         fail("Enable to get an int from: " + rdPortStr);
@@ -706,11 +697,6 @@
       this.exportedEntryCount = exportedEntryCount;
     }
 
-    public void initImport(StringBuilder importString)
-    {
-      this.importString = importString;
-    }
-
     @Override
     public long countEntries() throws DirectoryException
     {
@@ -730,7 +716,6 @@
         throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
           ERR_BACKEND_EXPORT_ENTRY.get("", ""));
       }
-
     }
 
     @Override
@@ -761,17 +746,12 @@
     }
 
     @Override
-    public boolean processUpdate(UpdateMsg updateMsg)
+    public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
     {
       if (queue != null)
         queue.add(updateMsg);
       return true;
     }
-
-    public void setGenerationID(long newGenerationID)
-    {
-      generationID = newGenerationID;
-    }
   }
 
   private static final String REPLICATION_GENERATION_ID =
@@ -1320,7 +1300,7 @@
           "domain status obtained after " + (toWait-nSec) + " second(s).");
         return;
       }
-      sleep(1000);
+      TestCaseUtils.sleep(1000);
       nSec--;
     }
     fail("Did not get expected replication domain status: expected <" + expectedStatus +
@@ -1567,9 +1547,8 @@
       createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
 
       /*
-       * Perform add operation with fornbidden attribute in RDN
+       * Perform add operation with forbidden attribute in RDN
        */
-
       String entryLdif = "dn: displayName=ValueToBeKept," +
         TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
         "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1620,9 +1599,8 @@
        */
 
       /*
-       * Perform add operation with fornbidden attribute in RDN
+       * Perform add operation with forbidden attribute in RDN
        */
-
       entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," +
         TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
         "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1698,9 +1676,8 @@
       createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
 
       /*
-       * Perform add operation with fornbidden attribute in RDN
+       * Perform add operation with forbidden attribute in RDN
        */
-
       String entryLdif = "dn: displayName=ValueToBeKept," +
         TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
         "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1754,9 +1731,8 @@
        */
 
       /*
-       * Perform add operation with fornbidden attribute in RDN
+       * Perform add operation with forbidden attribute in RDN
        */
-
       entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," +
         TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
         "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1834,9 +1810,8 @@
       createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
 
       /*
-       * Perform add operation with fornbidden attribute in RDN
+       * Perform add operation with forbidden attribute in RDN
        */
-
       String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ;
       String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" +
         "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1936,7 +1911,7 @@
   @Test
   public void testModifyDnWithForbiddenAttrInRDNInclude()
   {
-     String testcase = "testModifyDnWithForbiddenAttrInRDNInclude";
+    String testcase = "testModifyDnWithForbiddenAttrInRDNInclude";
 
     initTest();
 
@@ -1953,9 +1928,8 @@
       createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
 
       /*
-       * Perform add operation with fornbidden attribute in RDN
+       * Perform add operation with forbidden attribute in RDN
        */
-
       String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ;
       String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" +
         "objectClass: person\n" + "objectClass: organizationalPerson\n" +
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
index fde1cfd..bd59fbd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -23,13 +23,16 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
-import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
 
 import java.util.ArrayList;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.TestCaseUtils;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
@@ -40,24 +43,20 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.ModifyDNMsg;
 import org.opends.server.replication.protocol.DeleteMsg;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.DN;
-import org.opends.server.types.RDN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.ResultCode;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.types.*;
 import org.testng.annotations.Test;
 
-
-import static org.testng.Assert.*;
-
-
 /**
  * Test the naming conflict resolution code.
  */
+@SuppressWarnings("javadoc")
 public class NamingConflictTest extends ReplicationTestCase
 {
+
+  private static final AtomicBoolean SHUTDOWN = new AtomicBoolean(false);
+
   /**
    * Test for issue 3402 : test, that a modrdn that is older than an other
    * modrdn but that is applied later is ignored.
@@ -123,10 +122,10 @@
       "uid=simultaneous2");
 
       // Put the message in the replay queue
-      domain.processUpdate(modDnMsg);
+      domain.processUpdate(modDnMsg, SHUTDOWN);
 
       // Make the domain replay the change from the replay queue
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // This MODIFY DN uses an older DN and should therefore be cancelled
       // at replay time.
@@ -137,11 +136,11 @@
       "uid=simulatneouswrong");
 
       // Put the message in the replay queue
-      domain.processUpdate(modDnMsg);
+      domain.processUpdate(modDnMsg, SHUTDOWN);
 
       // Make the domain replay the change from the replay queue
       // and resolve conflict
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // Expect the conflict resolution
       assertFalse(DirectoryServer.entryExists(entry.getDN()),
@@ -158,8 +157,6 @@
    * a delete operation has removed one of the conflicting entries
    * the other conflicting entry is correctly renamed to its
    * original name.
-   *
-   * @throws Exception if the test fails.
    */
   @Test(enabled=true)
   public void conflictCleaningDelete() throws Exception
@@ -215,10 +212,10 @@
             null);
 
       // Put the message in the replay queue
-      domain.processUpdate(addMsg);
+      domain.processUpdate(addMsg, SHUTDOWN);
 
       // Make the domain replay the change from the replay queue
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // Now delete the first entry that was added at the beginning
       TestCaseUtils.deleteEntry(entry.getDN());
@@ -226,7 +223,7 @@
       // Expect the conflict resolution : the second entry should now
       // have been renamed with the original DN.
       Entry resultEntry = DirectoryServer.getEntry(entry.getDN());
-      assertTrue(resultEntry != null, "The conflict was not cleared");
+      assertNotNull(resultEntry, "The conflict was not cleared");
       assertEquals(getEntryUUID(resultEntry.getDN()),
           "c9cb8c3c-615a-4122-865d-50323aaaed48",
           "The wrong entry has been renamed");
@@ -300,10 +297,10 @@
             null);
 
       // Put the message in the replay queue
-      domain.processUpdate(addMsg);
+      domain.processUpdate(addMsg, SHUTDOWN);
 
       // Make the domain replay the change from the replay queue
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // Now delete the first entry that was added at the beginning
       InternalClientConnection conn =
@@ -316,7 +313,7 @@
       // Expect the conflict resolution : the second entry should now
       // have been renamed with the original DN.
       Entry resultEntry = DirectoryServer.getEntry(entry.getDN());
-      assertTrue(resultEntry != null, "The conflict was not cleared");
+      assertNotNull(resultEntry, "The conflict was not cleared");
       assertEquals(getEntryUUID(resultEntry.getDN()),
           "c9cb8c3c-615a-4122-865d-50323aaaed48",
           "The wrong entry has been renamed");
@@ -409,9 +406,9 @@
       delMsg.setSubtreeDelete(true);
 
       // Put the message in the replay queue
-      domain.processUpdate(delMsg);
+      domain.processUpdate(delMsg, SHUTDOWN);
       // Make the domain replay the change from the replay queue
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // Expect the subtree to be deleted and no conflict entry created
       assertFalse(DirectoryServer.entryExists(parentEntry.getDN()),
@@ -488,9 +485,9 @@
       // NOT SUBTREE
 
       // Put the message in the replay queue
-      domain.processUpdate(delMsg);
+      domain.processUpdate(delMsg, SHUTDOWN);
       // Make the domain replay the change from the replay queue
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // Expect the parent entry to be deleted
       assertTrue(!DirectoryServer.entryExists(parentEntry.getDN()),
@@ -502,7 +499,6 @@
           "+cn=child,o=test");
       assertTrue(DirectoryServer.entryExists(childDN),
           "Child entry conflict exist with DN="+childDN);
-
     }
     finally
     {
@@ -572,9 +568,9 @@
           new ArrayList<Attribute>());
 
       // Put the message in the replay queue
-      domain.processUpdate(addMsg);
+      domain.processUpdate(addMsg, SHUTDOWN);
       // Make the domain replay the change from the replay queue
-      domain.replay(queue.take().getUpdateMessage());
+      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
 
       // Expect the parent entry to be deleted
       assertFalse(DirectoryServer.entryExists(parentEntry.getDN()),
@@ -586,7 +582,6 @@
           "+cn=child,o=test");
       assertTrue(DirectoryServer.entryExists(childDN),
           "Child entry conflict exist with DN="+childDN);
-
     }
     finally
     {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 59ef954..60d967b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -23,36 +23,45 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
-import org.opends.server.types.ResultCode;
+import static org.opends.messages.ReplicationMessages.*;
+
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.config.ConfigException;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.DirectoryException;
-import static org.opends.messages.ReplicationMessages.*;
+import org.opends.server.types.ResultCode;
 
 /**
  * This class is the minimum implementation of a Concrete ReplicationDomain
  * used to test the Generic Replication Service.
  */
+@SuppressWarnings("javadoc")
 public class FakeReplicationDomain extends ReplicationDomain
 {
-  // A blocking queue that is used to send the UpdateMsg received from
-  // the Replication Service.
-  BlockingQueue<UpdateMsg> queue = null;
+  /**
+   * A blocking queue that is used to send the UpdateMsg received from the
+   * Replication Service.
+   */
+  private BlockingQueue<UpdateMsg> queue = null;
 
-  // A string that will be exported should exportBackend be called.
-  String exportString = null;
+  /** A string that will be exported should exportBackend be called. */
+  private String exportString = null;
 
-  // A StringBuilder that will be used to build a build a new String should the
-  // import be called.
-  StringBuilder importString = null;
+  /**
+   * A StringBuilder that will be used to build a build a new String should the
+   * import be called.
+   */
+  private StringBuilder importString = null;
 
   private int exportedEntryCount;
 
@@ -142,7 +151,7 @@
   }
 
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg)
+  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
   {
     if (queue != null)
       queue.add(updateMsg);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index fb56b9c..f41ac84 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -23,36 +23,36 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
-import org.opends.server.types.ResultCode;
+import static org.opends.messages.ReplicationMessages.*;
+
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.config.ConfigException;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.DirectoryException;
-import static org.opends.messages.ReplicationMessages.*;
+import org.opends.server.types.ResultCode;
 
 /**
  * This class is the minimum implementation of a Concrete ReplicationDomain
  * used to test the Generic Replication Service.
  */
+@SuppressWarnings("javadoc")
 public class FakeStressReplicationDomain extends ReplicationDomain
 {
-  // A blocking queue that is used to send the UpdateMsg received from
-  // the Replication Service.
-  BlockingQueue<UpdateMsg> queue = null;
-
-  // A string that will be exported should exportBackend be called.
-  String exportString = null;
-
-  // A StringBuilder that will be used to build a build a new String should the
-  // import be called.
-  StringBuilder importString = null;
+  /**
+   * A blocking queue that is used to send the UpdateMsg received from the
+   * Replication Service.
+   */
+  private BlockingQueue<UpdateMsg> queue = null;
 
   public FakeStressReplicationDomain(
       String serviceID,
@@ -68,23 +68,8 @@
     this.queue = queue;
   }
 
-  public FakeStressReplicationDomain(
-      String serviceID,
-      int serverID,
-      Collection<String> replicationServers,
-      int window,
-      long heartbeatInterval,
-      String exportString,
-      StringBuilder importString) throws ConfigException
-  {
-    super(serviceID, serverID, 100);
-    startPublishService(replicationServers, window, heartbeatInterval, 500);
-    startListenService();
-    this.exportString = exportString;
-    this.importString = importString;
-  }
+  private static final int IMPORT_SIZE = 100000000;
 
-  final int IMPORT_SIZE = 100000000;
   @Override
   public long countEntries() throws DirectoryException
   {
@@ -150,7 +135,7 @@
   }
 
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg)
+  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
   {
     if (queue != null)
       queue.add(updateMsg);

--
Gitblit v1.10.0