mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.20.2013 856fdd0571358c660afaf379f8e774ab8b24f05c
OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock 


The code replaying the replicated operations now handles BUSY and UNAVAILABLE result codes and will retry replaying these operations.
Prevented a possible OutOfMemoryError by bounding the queue holding the replicated operations.


LDAPReplicationDomain.java, ReplicationDomain.java:
In replay(), added the AtomicBoolean shutdown parameter + tested for server shutdown + handled BUSY and UNAVAILABLE result codes +
In processUpdate(), added the AtomicBoolean shutdown parameter + tested for server shutdown + ensured the code works with a bounded queue.
Extracted logDecodingOperationError() out of replay().

MultimasterReplication.java:
Made updateToReplayQueue a bounded queue (it was unbounded before).

ReplayThread.java, ListenerThread.java:
Converted shutdown boolean instance member into an AtomicBoolean.
9 files modified
680 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 194 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 87 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ListenerThread.java 28 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 85 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 116 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java 57 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 37 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 49 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -37,14 +36,12 @@
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.io.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -63,14 +60,7 @@
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.ModifyOperationBasis;
import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -80,12 +70,7 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -93,16 +78,7 @@
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.*;
import org.opends.server.types.operation.PluginOperation;
import org.opends.server.types.operation.PostOperationAddOperation;
import org.opends.server.types.operation.PostOperationDeleteOperation;
import org.opends.server.types.operation.PostOperationModifyDNOperation;
import org.opends.server.types.operation.PostOperationModifyOperation;
import org.opends.server.types.operation.PostOperationOperation;
import org.opends.server.types.operation.PreOperationAddOperation;
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import org.opends.server.types.operation.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -191,8 +167,10 @@
   */
  private static final DebugTracer TRACER = getTracer();
  // The update to replay message queue where the listener thread is going to
  // push incoming update messages.
  /**
   * The update to replay message queue where the listener thread is going to
   * push incoming update messages.
   */
  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
  private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
  private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
@@ -238,9 +216,11 @@
  private volatile boolean disabled = false;
  private volatile boolean stateSavingDisabled = false;
  // This list is used to temporary store operations that needs
  // to be replayed at session establishment time.
  private final TreeMap<ChangeNumber, FakeOperation> replayOperations  =
  /**
   * This list is used to temporary store operations that needs to be replayed
   * at session establishment time.
   */
  private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
    new TreeMap<ChangeNumber, FakeOperation>();
  /**
@@ -288,7 +268,7 @@
   * Fractional replication variables.
   */
  // Holds the fractional configuration for this domain, if any.
  /** Holds the fractional configuration for this domain, if any. */
  private FractionalConfig fractionalConfig = null;
  /**
@@ -341,29 +321,39 @@
   * fractionalFilterOperation(PreOperationModifyOperation
   *  modifyOperation, boolean performFiltering) method
   */
  // The operation contains attributes subject to fractional filtering according
  // to the fractional configuration
  /**
   * The operation contains attributes subject to fractional filtering according
   * to the fractional configuration.
   */
  private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
  // The operation contains no attributes subject to fractional filtering
  // according to the fractional configuration
  /**
   * The operation contains no attributes subject to fractional filtering
   * according to the fractional configuration.
   */
  private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
  // The operation should become a no-op
  /** The operation should become a no-op. */
  private static final int FRACTIONAL_BECOME_NO_OP = 3;
  // This configuration boolean indicates if this ReplicationDomain should log
  // ChangeNumbers.
  /**
   * This configuration boolean indicates if this ReplicationDomain should log
   * ChangeNumbers.
   */
  private boolean logChangeNumber = false;
  // This configuration integer indicates the time the domain keeps the
  // historical information necessary to solve conflicts.
  // When a change stored in the historical part of the user entry has a date
  // (from its replication ChangeNumber) older than this delay, it is candidate
  // to be purged.
  /**
   * This configuration integer indicates the time the domain keeps the
   * historical information necessary to solve conflicts.<br>
   * When a change stored in the historical part of the user entry has a date
   * (from its replication ChangeNumber) older than this delay, it is candidate
   * to be purged.
   */
  private long histPurgeDelayInMilliSec = 0;
  // The last change number purged in this domain. Allows to have a continuous
  // purging process from one purge processing (task run) to the next one.
  // Values 0 when the server starts.
  /**
   * The last change number purged in this domain. Allows to have a continuous
   * purging process from one purge processing (task run) to the next one.
   * Values 0 when the server starts.
   */
  private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
  /**
@@ -752,10 +742,8 @@
    if (fractionalConfig.isFractional())
    {
      // Set new fractional configuration values
      if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
        fractionalConfig.setFractionalExclusive(true);
      else
        fractionalConfig.setFractionalExclusive(false);
      fractionalConfig.setFractionalExclusive(
          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
      fractionalConfig.setFractionalSpecificClassesAttributes(
        newFractionalConfig.getFractionalSpecificClassesAttributes());
      fractionalConfig.setFractionalAllClassesAttributes(
@@ -950,10 +938,8 @@
    // Set stored fractional configuration values
    if (storedFractionalConfig.isFractional())
    {
      if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
        storedFractionalConfig.setFractionalExclusive(true);
      else
        storedFractionalConfig.setFractionalExclusive(false);
      storedFractionalConfig.setFractionalExclusive(
          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
    }
    storedFractionalConfig.setFractionalSpecificClassesAttributes(
      storedFractionalSpecificClassesAttributes);
@@ -1434,12 +1420,10 @@
            AttributeValue rdnAttributeValue =
              entryRdn.getAttributeValue(attributeType);
            List<Attribute> attrList = attributesMap.get(attributeType);
            Iterator<Attribute> attrIt = attrList.iterator();
            AttributeValue sameAttrValue = null;
            //    Locate the attribute value identical to the one in the RDN
            while(attrIt.hasNext())
            for (Attribute attr : attrList)
            {
              Attribute attr = attrIt.next();
              if (attr.contains(rdnAttributeValue))
              {
                for (AttributeValue attrValue : attr) {
@@ -2577,9 +2561,12 @@
  /**
   * Create and replay a synchronized Operation from an UpdateMsg.
   *
   * @param msg The UpdateMsg to be replayed.
   * @param msg
   *          The UpdateMsg to be replayed.
   * @param shutdown
   *          whether the server initiated shutdown
   */
  public void replay(LDAPUpdateMsg msg)
  public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
  {
    Operation op = null;
    boolean replayDone = false;
@@ -2599,6 +2586,11 @@
        while ((!dependency) && (!replayDone) && (retryCount-- > 0))
        {
          if (shutdown.get())
          {
            // shutdown initiated, let's leave
            return;
          }
          // Try replay the operation
          op.setInternalOperation(true);
          op.setSynchronizationOperation(true);
@@ -2622,6 +2614,25 @@
              // renamed by a more recent modify DN.
              replayDone = true;
            }
            else if (result == ResultCode.BUSY)
            {
              /*
               * We probably could not get a lock (OPENDJ-885). Give the server
               * another chance to process this operation immediately.
               */
              Thread.yield();
              continue;
            }
            else if (result == ResultCode.UNAVAILABLE)
            {
              /*
               * It can happen when a rebuild is performed or the backend is
               * offline (OPENDJ-49). Give the server another chance to process
               * this operation after some time.
               */
              Thread.sleep(50);
              continue;
            }
            else if (op instanceof ModifyOperation)
            {
              ModifyOperation newOp = (ModifyOperation) op;
@@ -2693,22 +2704,13 @@
        }
      } catch (ASN1Exception e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (LDAPException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (DataFormatException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (Exception e)
      {
        if (changeNumber != null)
@@ -2726,10 +2728,7 @@
          updateError(changeNumber);
        } else
        {
          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
            String.valueOf(msg) + stackTraceToSingleLineString(e));
          logError(message);
          replayErrorMsg = message.toString();
          replayErrorMsg = logDecodingOperationError(msg, e);
        }
      } finally
      {
@@ -2753,6 +2752,14 @@
    } while (msg != null);
  }
  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
  {
    Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
      String.valueOf(msg) + stackTraceToSingleLineString(e));
    logError(message);
    return message.toString();
  }
  /**
   * This method is called when an error happens while replaying
   * an operation.
@@ -4862,7 +4869,7 @@
   * {@inheritDoc}
   */
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  {
    // Ignore message if fractional configuration is inconsistent and
    // we have been passed into bad data set status
@@ -4880,8 +4887,23 @@
      // Put update message into the replay queue
      // (block until some place in the queue is available)
      UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
      updateToReplayQueue.offer(updateToReplay);
      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
      while (!shutdown.get())
      {
        // loop until we can offer to the queue or shutdown was initiated
        try
        {
          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
          {
            // successful offer to the queue, let's exit the loop
            break;
          }
        }
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
        }
      }
      return false;
    }
@@ -5321,10 +5343,8 @@
        case EXCLUSIVE_FRACTIONAL:
        case INCLUSIVE_FRACTIONAL:
          result.setFractional(true);
          if (newFractionalMode == EXCLUSIVE_FRACTIONAL)
            result.setFractionalExclusive(true);
          else
            result.setFractionalExclusive(false);
          result.setFractionalExclusive(
              newFractionalMode == EXCLUSIVE_FRACTIONAL);
          break;
      }
      result.setFractionalSpecificClassesAttributes(
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@
 */
package org.opends.server.replication.plugin;
import java.util.ArrayList;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.api.Backend;
import org.opends.server.api.BackupTaskListener;
import org.opends.server.api.ExportTaskListener;
import org.opends.server.api.ImportTaskListener;
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SynchronizationProviderResult;
import org.opends.server.types.operation.PluginOperation;
import org.opends.server.types.operation.PostOperationAddOperation;
import org.opends.server.types.operation.PostOperationDeleteOperation;
import org.opends.server.types.operation.PostOperationModifyDNOperation;
import org.opends.server.types.operation.PostOperationModifyOperation;
import org.opends.server.types.operation.PostOperationOperation;
import org.opends.server.types.operation.PreOperationAddOperation;
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import org.opends.server.types.*;
import org.opends.server.types.operation.*;
/**
 * This class is used to load the Replication code inside the JVM
@@ -104,8 +76,8 @@
   * The queue of received update messages, to be treated by the ReplayThread
   * threads.
   */
  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
    new LinkedBlockingQueue<UpdateToReplay>();
  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue =
      new LinkedBlockingQueue<UpdateToReplay>(10000);
  /**
   * The list of ReplayThread threads.
@@ -228,9 +200,9 @@
  }
  /**
   * Creates a new domain from its configEntry, do the
   * necessary initialization and starts it so that it is
   * fully operational when this method returns.
   * Creates a new domain from its configEntry, do the necessary initialization
   * and starts it so that it is fully operational when this method returns. It
   * is only used for tests so far.
   *
   * @param configuration The entry with the configuration of this domain.
   * @param queue         The BlockingQueue that this domain will use.
@@ -239,13 +211,13 @@
   *
   * @throws ConfigException When the configuration is not valid.
   */
  public static LDAPReplicationDomain createNewDomain(
  static LDAPReplicationDomain createNewDomain(
      ReplicationDomainCfg configuration,
      BlockingQueue<UpdateToReplay> queue)
      throws ConfigException
  {
    LDAPReplicationDomain domain;
    domain = new LDAPReplicationDomain(configuration, queue);
    LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue);
    domains.put(domain.getBaseDN(), domain);
    return domain;
@@ -362,6 +334,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationAddAcceptable(
      ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
  {
@@ -372,6 +345,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationAdd(
     ReplicationDomainCfg configuration)
  {
@@ -652,6 +626,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processBackupBegin(Backend backend, BackupConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -665,6 +640,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processBackupEnd(Backend backend, BackupConfig config,
                               boolean successful)
  {
@@ -679,6 +655,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processRestoreBegin(Backend backend, RestoreConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -692,6 +669,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processRestoreEnd(Backend backend, RestoreConfig config,
                                boolean successful)
  {
@@ -706,6 +684,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processImportBegin(Backend backend, LDIFImportConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -719,6 +698,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processImportEnd(Backend backend, LDIFImportConfig config,
                               boolean successful)
  {
@@ -733,6 +713,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processExportBegin(Backend backend, LDIFExportConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -746,6 +727,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processExportEnd(Backend backend, LDIFExportConfig config,
                               boolean successful)
  {
@@ -760,6 +742,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationDelete(
      ReplicationDomainCfg configuration)
  {
@@ -771,6 +754,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationDeleteAcceptable(
      ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
  {
@@ -804,10 +788,10 @@
  /**
   * {@inheritDoc}
   */
  public boolean
    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
    configuration,
    List<Message> unacceptableReasons)
  @Override
  public boolean isConfigurationChangeAcceptable(
      ReplicationSynchronizationProviderCfg configuration,
      List<Message> unacceptableReasons)
  {
    return true;
  }
@@ -815,9 +799,9 @@
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult
    applyConfigurationChange
    (ReplicationSynchronizationProviderCfg configuration)
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationSynchronizationProviderCfg configuration)
  {
    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
@@ -838,6 +822,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void completeSynchronizationProvider()
  {
    isRegistered = true;
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -26,20 +26,20 @@
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
/**
 * Thread that is used to get message from the replication servers (stored
@@ -56,7 +56,7 @@
  private static final DebugTracer TRACER = getTracer();
  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
  private volatile boolean shutdown = false;
  private AtomicBoolean shutdown = new AtomicBoolean(false);
  private static int count = 0;
  /**
@@ -75,7 +75,7 @@
   */
  public void shutdown()
  {
    shutdown = true;
    shutdown.set(true);
  }
  /**
@@ -84,27 +84,26 @@
  @Override
  public void run()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Replay thread starting.");
    }
    while (!shutdown)
    while (!shutdown.get())
    {
      try
      {
        UpdateToReplay updateToreplay;
        // Loop getting an updateToReplayQueue from the update message queue and
        // replaying matching changes
        while ( (!shutdown) &&
        while (!shutdown.get() &&
          ((updateToreplay = updateToReplayQueue.poll(1L,
          TimeUnit.SECONDS)) != null))
        {
          // Find replication domain for that update message
          LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
          LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
          domain.replay(updateMsg);
          domain.replay(updateMsg, shutdown);
        }
      } catch (Exception e)
      {
opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -26,14 +26,15 @@
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
import org.opends.messages.Message;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -50,7 +51,7 @@
  private static final DebugTracer TRACER = getTracer();
  private final ReplicationDomain repDomain;
  private volatile boolean shutdown = false;
  private AtomicBoolean shutdown = new AtomicBoolean(false);
  private volatile boolean done = false;
@@ -72,7 +73,7 @@
   */
  public void shutdown()
  {
    shutdown = true;
    shutdown.set(true);
  }
  /**
@@ -81,22 +82,21 @@
  @Override
  public void run()
  {
    UpdateMsg updateMsg = null;
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Listener thread starting.");
    }
    while (!shutdown)
    while (!shutdown.get())
    {
      UpdateMsg updateMsg = null;
      try
      {
        // Loop receiving update messages and puting them in the update message
        // Loop receiving update messages and putting them in the update message
        // queue
        while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
        while (!shutdown.get() && ((updateMsg = repDomain.receive()) != null))
        {
          if (repDomain.processUpdate(updateMsg))
          if (repDomain.processUpdate(updateMsg, shutdown))
          {
            repDomain.processUpdateDoneSynchronous(updateMsg);
          }
@@ -104,7 +104,7 @@
        if (updateMsg == null)
        {
          shutdown = true;
          shutdown.set(true);
        }
      }
      catch (Exception e)
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -28,9 +28,8 @@
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import java.io.BufferedOutputStream;
@@ -41,6 +40,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -50,32 +50,8 @@
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
@@ -3184,41 +3160,40 @@
  public abstract long countEntries() throws DirectoryException;
  /**
   * This method should handle the processing of {@link UpdateMsg} receive
   * from remote replication entities.
   * This method should handle the processing of {@link UpdateMsg} receive from
   * remote replication entities.
   * <p>
   * This method will be called by a single thread and should therefore
   * should not be blocking.
   * This method will be called by a single thread and should therefore should
   * not be blocking.
   *
   * @param updateMsg The {@link UpdateMsg} that was received.
   *
   * @return A boolean indicating if the processing is completed at return
   *                   time.
   *                   If <code> true </code> is returned, no further
   *                   processing is necessary.
   *
   *                   If <code> false </code> is returned, the subclass should
   *                   call the method
   *                   {@link #processUpdateDone(UpdateMsg, String)}
   *                   and update the ServerState
   *                   When this processing is complete.
   *
   * @param updateMsg
   *          The {@link UpdateMsg} that was received.
   * @param shutdown
   *          whether the server initiated shutdown
   * @return A boolean indicating if the processing is completed at return time.
   *         If <code> true </code> is returned, no further processing is
   *         necessary. If <code> false </code> is returned, the subclass should
   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
   *         update the ServerState When this processing is complete.
   */
  public abstract boolean processUpdate(UpdateMsg updateMsg);
  public abstract boolean processUpdate(UpdateMsg updateMsg,
      AtomicBoolean shutdown);
  /**
   * This method must be called after each call to
   * {@link #processUpdate(UpdateMsg)} when the processing of the update is
   * completed.
   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
   * update is completed.
   * <p>
   * It is useful for implementation needing to process the update in an
   * asynchronous way or using several threads, but must be called even
   * by implementation doing it in a synchronous, single-threaded way.
   * asynchronous way or using several threads, but must be called even by
   * implementation doing it in a synchronous, single-threaded way.
   *
   * @param  msg The UpdateMsg whose processing was completed.
   * @param replayErrorMsg if not null, this means an error occurred during the
   * replay of this update, and this is the matching human readable message
   * describing the problem.
   * @param msg
   *          The UpdateMsg whose processing was completed.
   * @param replayErrorMsg
   *          if not null, this means an error occurred during the replay of
   *          this update, and this is the matching human readable message
   *          describing the problem.
   */
  public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -23,24 +23,20 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import org.opends.server.util.StaticUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -60,84 +56,80 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ObjectClass;
import org.opends.server.types.ResultCode;
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
 * Various tests around fractional replication
 */
@SuppressWarnings("javadoc")
public class FractionalReplicationTest extends ReplicationTestCase {
  // The RS
  /** The RS */
  private ReplicationServer replicationServer = null;
  // RS port
  /** RS port */
  private int replServerPort = -1;
  // Represents the real domain to test (replays and filters)
  /** Represents the real domain to test (replays and filters) */
  private Entry fractionalDomainCfgEntry = null;
  // The domain used to send updates to the reald domain
  /** The domain used to send updates to the reald domain */
  private FakeReplicationDomain replicationDomain = null;
  // Ids of servers
  /** Ids of servers */
  private static final int DS1_ID = 1; // fractional domain
  private static final int DS2_ID = 2; // fake domain
  private static final int RS_ID = 91; // replication server
  private final String testName = this.getClass().getSimpleName();
  // Fractional mode
  /** Fractional mode */
  private static final int EXCLUDE_FRAC_MODE = 0;
  private static final int INCLUDE_FRAC_MODE = 1;
  int initWindow = 100;
  private ChangeNumberGenerator gen = null;
  // The tracer object for the debug logger
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  // Number of seconds before generating an error if some conditions not met
  /** Number of seconds before generating an error if some conditions not met */
  private static final int TIMEOUT = 10000;
  // Uuid of the manipulated entry
  /** Uuid of the manipulated entry */
  private static final String ENTRY_UUID =
    "11111111-1111-1111-1111-111111111111";
  private static final String ENTRY_UUID2 =
    "22222222-2222-2222-2222-222222222222";
  private static final String ENTRY_UUID3 =
    "33333333-3333-3333-3333-333333333333";
  // Dn of the manipulated entry
  /** Dn of the manipulated entry */
  private static String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING;
  // Optional attribute not part of concerned attributes of the fractional
  // configuration during tests. It should not be impacted by fractional
  // mechanism
  /**
   * Optional attribute not part of concerned attributes of the fractional
   * configuration during tests. It should not be impacted by fractional
   * mechanism
   */
  private static final String OPTIONAL_ATTR = "description";
  // Optional attribute used as synchronization attribute to know when the modify
  // operation has been processed (used as add new attribute in the modify operation)
  // It may or may not be part of the filtered attributes, depending on the fractional
  // test mode : exclusive or inclusive
  /**
   * Optional attribute used as synchronization attribute to know when the
   * modify operation has been processed (used as add new attribute in the
   * modify operation) It may or may not be part of the filtered attributes,
   * depending on the fractional test mode : exclusive or inclusive
   */
  private static final String SYNCHRO_OPTIONAL_ATTR = "seeAlso";
  // Second test backend
  /** Second test backend */
  private static final String TEST2_ROOT_DN_STRING = "dc=example,dc=com";
  private static final String TEST2_ORG_DN_STRING = "o=test2," + TEST2_ROOT_DN_STRING;
  private static String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING;
@@ -456,7 +448,6 @@
  private void createFakeReplicationDomain(boolean firstBackend, long generationId)
  {
    try{
      List<String> replicationServers = new ArrayList<String>();
      replicationServers.add("localhost:" + replServerPort);
@@ -474,7 +465,7 @@
      String rdPortStr = serverStr.substring(index + 1);
      try
      {
        rdPort = (new Integer(rdPortStr)).intValue();
        rdPort = Integer.valueOf(rdPortStr);
      } catch (Exception e)
      {
        fail("Enable to get an int from: " + rdPortStr);
@@ -706,11 +697,6 @@
      this.exportedEntryCount = exportedEntryCount;
    }
    public void initImport(StringBuilder importString)
    {
      this.importString = importString;
    }
    @Override
    public long countEntries() throws DirectoryException
    {
@@ -730,7 +716,6 @@
        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
          ERR_BACKEND_EXPORT_ENTRY.get("", ""));
      }
    }
    @Override
@@ -761,17 +746,12 @@
    }
    @Override
    public boolean processUpdate(UpdateMsg updateMsg)
    public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
    {
      if (queue != null)
        queue.add(updateMsg);
      return true;
    }
    public void setGenerationID(long newGenerationID)
    {
      generationID = newGenerationID;
    }
  }
  private static final String REPLICATION_GENERATION_ID =
@@ -1320,7 +1300,7 @@
          "domain status obtained after " + (toWait-nSec) + " second(s).");
        return;
      }
      sleep(1000);
      TestCaseUtils.sleep(1000);
      nSec--;
    }
    fail("Did not get expected replication domain status: expected <" + expectedStatus +
@@ -1567,9 +1547,8 @@
      createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
      /*
       * Perform add operation with fornbidden attribute in RDN
       * Perform add operation with forbidden attribute in RDN
       */
      String entryLdif = "dn: displayName=ValueToBeKept," +
        TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1620,9 +1599,8 @@
       */
      /*
       * Perform add operation with fornbidden attribute in RDN
       * Perform add operation with forbidden attribute in RDN
       */
      entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," +
        TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1698,9 +1676,8 @@
      createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
      /*
       * Perform add operation with fornbidden attribute in RDN
       * Perform add operation with forbidden attribute in RDN
       */
      String entryLdif = "dn: displayName=ValueToBeKept," +
        TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1754,9 +1731,8 @@
       */
      /*
       * Perform add operation with fornbidden attribute in RDN
       * Perform add operation with forbidden attribute in RDN
       */
      entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," +
        TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1834,9 +1810,8 @@
      createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
      /*
       * Perform add operation with fornbidden attribute in RDN
       * Perform add operation with forbidden attribute in RDN
       */
      String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ;
      String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1936,7 +1911,7 @@
  @Test
  public void testModifyDnWithForbiddenAttrInRDNInclude()
  {
     String testcase = "testModifyDnWithForbiddenAttrInRDNInclude";
    String testcase = "testModifyDnWithForbiddenAttrInRDNInclude";
    initTest();
@@ -1953,9 +1928,8 @@
      createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
      /*
       * Perform add operation with fornbidden attribute in RDN
       * Perform add operation with forbidden attribute in RDN
       */
      String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ;
      String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -23,13 +23,16 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.TestCaseUtils.*;
import static org.testng.Assert.*;
import java.util.ArrayList;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
@@ -40,24 +43,20 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.RDN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.types.*;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Test the naming conflict resolution code.
 */
@SuppressWarnings("javadoc")
public class NamingConflictTest extends ReplicationTestCase
{
  private static final AtomicBoolean SHUTDOWN = new AtomicBoolean(false);
  /**
   * Test for issue 3402 : test, that a modrdn that is older than an other
   * modrdn but that is applied later is ignored.
@@ -123,10 +122,10 @@
      "uid=simultaneous2");
      // Put the message in the replay queue
      domain.processUpdate(modDnMsg);
      domain.processUpdate(modDnMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // This MODIFY DN uses an older DN and should therefore be cancelled
      // at replay time.
@@ -137,11 +136,11 @@
      "uid=simulatneouswrong");
      // Put the message in the replay queue
      domain.processUpdate(modDnMsg);
      domain.processUpdate(modDnMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      // and resolve conflict
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // Expect the conflict resolution
      assertFalse(DirectoryServer.entryExists(entry.getDN()),
@@ -158,8 +157,6 @@
   * a delete operation has removed one of the conflicting entries
   * the other conflicting entry is correctly renamed to its
   * original name.
   *
   * @throws Exception if the test fails.
   */
  @Test(enabled=true)
  public void conflictCleaningDelete() throws Exception
@@ -215,10 +212,10 @@
            null);
      // Put the message in the replay queue
      domain.processUpdate(addMsg);
      domain.processUpdate(addMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // Now delete the first entry that was added at the beginning
      TestCaseUtils.deleteEntry(entry.getDN());
@@ -226,7 +223,7 @@
      // Expect the conflict resolution : the second entry should now
      // have been renamed with the original DN.
      Entry resultEntry = DirectoryServer.getEntry(entry.getDN());
      assertTrue(resultEntry != null, "The conflict was not cleared");
      assertNotNull(resultEntry, "The conflict was not cleared");
      assertEquals(getEntryUUID(resultEntry.getDN()),
          "c9cb8c3c-615a-4122-865d-50323aaaed48",
          "The wrong entry has been renamed");
@@ -300,10 +297,10 @@
            null);
      // Put the message in the replay queue
      domain.processUpdate(addMsg);
      domain.processUpdate(addMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // Now delete the first entry that was added at the beginning
      InternalClientConnection conn =
@@ -316,7 +313,7 @@
      // Expect the conflict resolution : the second entry should now
      // have been renamed with the original DN.
      Entry resultEntry = DirectoryServer.getEntry(entry.getDN());
      assertTrue(resultEntry != null, "The conflict was not cleared");
      assertNotNull(resultEntry, "The conflict was not cleared");
      assertEquals(getEntryUUID(resultEntry.getDN()),
          "c9cb8c3c-615a-4122-865d-50323aaaed48",
          "The wrong entry has been renamed");
@@ -409,9 +406,9 @@
      delMsg.setSubtreeDelete(true);
      // Put the message in the replay queue
      domain.processUpdate(delMsg);
      domain.processUpdate(delMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // Expect the subtree to be deleted and no conflict entry created
      assertFalse(DirectoryServer.entryExists(parentEntry.getDN()),
@@ -488,9 +485,9 @@
      // NOT SUBTREE
      // Put the message in the replay queue
      domain.processUpdate(delMsg);
      domain.processUpdate(delMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // Expect the parent entry to be deleted
      assertTrue(!DirectoryServer.entryExists(parentEntry.getDN()),
@@ -502,7 +499,6 @@
          "+cn=child,o=test");
      assertTrue(DirectoryServer.entryExists(childDN),
          "Child entry conflict exist with DN="+childDN);
    }
    finally
    {
@@ -572,9 +568,9 @@
          new ArrayList<Attribute>());
      // Put the message in the replay queue
      domain.processUpdate(addMsg);
      domain.processUpdate(addMsg, SHUTDOWN);
      // Make the domain replay the change from the replay queue
      domain.replay(queue.take().getUpdateMessage());
      domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
      // Expect the parent entry to be deleted
      assertFalse(DirectoryServer.entryExists(parentEntry.getDN()),
@@ -586,7 +582,6 @@
          "+cn=child,o=test");
      assertTrue(DirectoryServer.entryExists(childDN),
          "Child entry conflict exist with DN="+childDN);
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -23,36 +23,45 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.service;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import org.opends.server.config.ConfigException;
import java.util.Collection;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.DirectoryException;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.server.types.ResultCode;
/**
 * This class is the minimum implementation of a Concrete ReplicationDomain
 * used to test the Generic Replication Service.
 */
@SuppressWarnings("javadoc")
public class FakeReplicationDomain extends ReplicationDomain
{
  // A blocking queue that is used to send the UpdateMsg received from
  // the Replication Service.
  BlockingQueue<UpdateMsg> queue = null;
  /**
   * A blocking queue that is used to send the UpdateMsg received from the
   * Replication Service.
   */
  private BlockingQueue<UpdateMsg> queue = null;
  // A string that will be exported should exportBackend be called.
  String exportString = null;
  /** A string that will be exported should exportBackend be called. */
  private String exportString = null;
  // A StringBuilder that will be used to build a build a new String should the
  // import be called.
  StringBuilder importString = null;
  /**
   * A StringBuilder that will be used to build a build a new String should the
   * import be called.
   */
  private StringBuilder importString = null;
  private int exportedEntryCount;
@@ -142,7 +151,7 @@
  }
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  {
    if (queue != null)
      queue.add(updateMsg);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -23,36 +23,36 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.service;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import org.opends.server.config.ConfigException;
import java.util.Collection;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.DirectoryException;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.server.types.ResultCode;
/**
 * This class is the minimum implementation of a Concrete ReplicationDomain
 * used to test the Generic Replication Service.
 */
@SuppressWarnings("javadoc")
public class FakeStressReplicationDomain extends ReplicationDomain
{
  // A blocking queue that is used to send the UpdateMsg received from
  // the Replication Service.
  BlockingQueue<UpdateMsg> queue = null;
  // A string that will be exported should exportBackend be called.
  String exportString = null;
  // A StringBuilder that will be used to build a build a new String should the
  // import be called.
  StringBuilder importString = null;
  /**
   * A blocking queue that is used to send the UpdateMsg received from the
   * Replication Service.
   */
  private BlockingQueue<UpdateMsg> queue = null;
  public FakeStressReplicationDomain(
      String serviceID,
@@ -68,23 +68,8 @@
    this.queue = queue;
  }
  public FakeStressReplicationDomain(
      String serviceID,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
      String exportString,
      StringBuilder importString) throws ConfigException
  {
    super(serviceID, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.exportString = exportString;
    this.importString = importString;
  }
  private static final int IMPORT_SIZE = 100000000;
  final int IMPORT_SIZE = 100000000;
  @Override
  public long countEntries() throws DirectoryException
  {
@@ -150,7 +135,7 @@
  }
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  {
    if (queue != null)
      queue.add(updateMsg);