mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
18.18.2008 eb1275dc2c85f9e3db21bfd5a151d4bc0740d336
Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn

2 files added
8 files modified
653 ■■■■ changed files
opends/resource/schema/02-config.ldif 15 ●●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml 26 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java 56 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 114 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java 142 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 219 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java 73 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java 4 ●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -1543,11 +1543,6 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445
  NAME 'ds-cfg-cache-level'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.315
  NAME 'ds-cfg-allow-retrieving-membership'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
@@ -2182,11 +2177,20 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445
  NAME 'ds-cfg-cache-level'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.446
  NAME 'ds-cfg-cache-preload'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.447
  NAME 'ds-cfg-num-update-replay-threads'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler'
  SUP top
@@ -3087,6 +3091,7 @@
  NAME 'ds-cfg-replication-synchronization-provider'
  SUP ds-cfg-synchronization-provider
  STRUCTURAL
  MAY ( ds-cfg-num-update-replay-threads )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.94
  NAME 'ds-cfg-dictionary-password-validator'
opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Portions Copyright 2007 Sun Microsystems, Inc.
  !      Portions Copyright 2007-2008 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="replication-synchronization-provider"
  plural-name="replication-synchronization-providers"
@@ -79,4 +79,28 @@
      </adm:defined>
    </adm:default-behavior>
  </adm:property-override>
  <adm:property name="num-update-replay-threads" mandatory="false" read-only="false">
    <adm:synopsis>
      Specifies the number of update replay threads
    </adm:synopsis>
    <adm:description>
      This is the number of threads created for replaying every updates
      received for all the replication domains.
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>
          10
        </adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="1" upper-limit="65535"></adm:integer>
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-num-update-replay-threads</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/messages/messages/replication.properties
@@ -257,4 +257,6 @@
 are missing due to a processing error : %s
SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
 sending request to get remote monitor data
SEVERE_ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE_109=An Exception was caught \
 while replaying replication message : %s
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -22,9 +22,11 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Message;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -48,22 +50,27 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private ReplicationDomain listener;
  private ReplicationDomain repDomain;
  private boolean shutdown = false;
  private boolean done = false;
  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
  /**
   * Constructor for the ListenerThread.
   *
   * @param listener the Plugin that created this thread
   * @param repDomain the replication domain that created this thread
   * @param updateToReplayQueue The update messages queue we must
   * store messages in
   */
  public ListenerThread(ReplicationDomain listener)
  public ListenerThread(ReplicationDomain repDomain,
    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
  {
     super("Replication Listener thread " +
         "serverID=" + listener.serverId +
         " domain=" + listener.getName());
     this.listener = listener;
         "serverID=" + repDomain.serverId +
         " domain=" + repDomain.getName());
     this.repDomain = repDomain;
     this.updateToReplayQueue = updateToReplayQueue;
  }
  /**
@@ -77,31 +84,50 @@
  /**
   * Run method for this class.
   */
  @Override
  public void run()
  {
    UpdateMessage msg;
    UpdateMessage updateMsg = null;
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Listener thread starting.");
    }
    while (shutdown == false)
    while (!shutdown)
    {
      try
      {
        while (((msg = listener.receive()) != null) && (shutdown == false))
        // Loop receiving update messages and puting them in the update message
        // queue
        while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
        {
          listener.replay(msg);
          // Put update message into the queue (block until some place in the
          // queue is available)
          UpdateToReplay updateToReplay =
            new UpdateToReplay(updateMsg, repDomain);
          boolean queued = false;
          while (!queued && !shutdown)
          {
            // Use timedout method (offer) instead of put for being able to
            // shutdown the thread
            queued = updateToReplayQueue.offer(updateToReplay,
              1L, TimeUnit.SECONDS);
        }
        if (msg == null)
          if (!queued)
          {
            // Shutdown requested but could not push message: ensure this one is
            // not lost and put it in the queue before dying
            updateToReplayQueue.offer(updateToReplay);
          }
        }
        if (updateMsg == null)
          shutdown = true;
      } catch (Exception e)
      {
        /*
         * catch all exceptions happening in listener.receive and
         * listener.replay so that the thread never dies even in case
         * of problems.
         * catch all exceptions happening in repDomain.receive so that the
         * thread never dies even in case of problems.
         */
        Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
            stackTraceToSingleLineString(e));
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.ArrayList;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
@@ -82,6 +85,8 @@
       extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
       implements ConfigurationAddListener<ReplicationDomainCfg>,
                  ConfigurationDeleteListener<ReplicationDomainCfg>,
                  ConfigurationChangeListener
                  <ReplicationSynchronizationProviderCfg>,
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
@@ -89,6 +94,23 @@
  private static Map<DN, ReplicationDomain> domains =
    new HashMap<DN, ReplicationDomain>() ;
  /**
   * The queue of received update messages, to be treated by the ReplayThread
   * threads.
   */
  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
    new LinkedBlockingQueue<UpdateToReplay>();
  /**
   * The list of ReplayThread threads.
   */
  private static List<ReplayThread> replayThreads =
    new ArrayList<ReplayThread>();
  /**
   * The configurable number of replay threads.
   */
  private static int replayThreadNumber = 10;
  /**
   * Finds the domain for a given DN.
@@ -167,7 +189,16 @@
      throws ConfigException
  {
    ReplicationDomain domain;
    domain = new ReplicationDomain(configuration);
    domain = new ReplicationDomain(configuration, updateToReplayQueue);
    if (domains.size() == 0)
    {
      /*
       * Create the threads that will process incoming update messages
       */
      createReplayThreads();
    }
    domains.put(domain.getBaseDN(), domain);
    domain.start();
    return domain;
@@ -180,6 +211,12 @@
  public static void deleteDomain(DN dn)
  {
    ReplicationDomain domain = domains.remove(dn);
    // No replay threads running if no replication need
    if (domains.size() == 0) {
      stopReplayThreads();
    }
    if (domain != null)
      domain.shutdown();
  }
@@ -200,6 +237,12 @@
    configuration.addReplicationDomainAddListener(this);
    configuration.addReplicationDomainDeleteListener(this);
    // Register as a root configuration listener so that we can be notified if
    // number of replay threads is changed and apply changes.
    configuration.addReplicationChangeListener(this);
    replayThreadNumber = configuration.getNumUpdateReplayThreads();
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    {
@@ -228,6 +271,39 @@
  }
  /**
   * Create the threads that will wait for incoming update messages.
   */
  private synchronized static void createReplayThreads()
  {
    replayThreads.clear();
    for (int i = 0; i < replayThreadNumber; i++)
    {
      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
      replayThread.start();
      replayThreads.add(replayThread);
    }
  }
  /**
   * Stope the threads that are waiting for incoming update messages.
   */
  private synchronized static void stopReplayThreads()
  {
    //  stop the replay threads
    for (ReplayThread replayThread : replayThreads)
    {
      replayThread.shutdown();
    }
    for (ReplayThread replayThread : replayThreads)
    {
      replayThread.waitForShutdown();
    }
    replayThreads.clear();
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationAddAcceptable(
@@ -431,6 +507,9 @@
  @Override
  public void finalizeSynchronizationProvider()
  {
    // Stop replay threads
    stopReplayThreads();
    // shutdown all the domains
    for (ReplicationDomain domain : domains.values())
    {
@@ -621,6 +700,37 @@
  {
    return replicationServerListener;
  }
  /**
   * {@inheritDoc}
   */
  public boolean
    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
    configuration,
    List<Message> unacceptableReasons)
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult
    applyConfigurationChange
    (ReplicationSynchronizationProviderCfg configuration)
  {
    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
    // Stop threads then restart new number of threads
    stopReplayThreads();
    replayThreadNumber = numUpdateRepayThread;
    if (domains.size() > 0)
    {
      createReplayThreads();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
}
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -158,7 +158,7 @@
  {
    /*
     * Parse the list of Update with dependencies and check if the dependencies
     * are now cleared until an Update withour dependencies is found.
     * are now cleared until an Update without dependencies is found.
     */
    for (PendingChange change : dependentChanges)
    {
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
New file
@@ -0,0 +1,142 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Message;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.UpdateMessage;
/**
 * Thread that is used to get message from the replication servers (stored
 * in the updates queue) and replay them in the current server. A configurable
 * number of this thread is created for the whole MultimasterReplication object
 * (i.e: these threads are shared accross the ReplicationDomain objects for
 * replaying the updates they receive)
 */
public class ReplayThread extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
  private boolean shutdown = false;
  private boolean done = false;
  /**
   * Constructor for the ReplayThread.
   *
   * @param updateToReplayQueue The queue of update messages we have to replay
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  {
     super("Replication Replay thread");
     this.updateToReplayQueue = updateToReplayQueue;
  }
  /**
   * Shutdown this replay thread.
   */
  public void shutdown()
  {
    shutdown = true;
  }
  /**
   * Run method for this class.
   */
  @Override
  public void run()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Replay thread starting.");
    }
    UpdateToReplay updateToreplay = null;
    while (!shutdown)
    {
      try
      {
        // Loop getting an updateToReplayQueue from the update message queue and
        // replaying matching changes
        while ( (!shutdown) &&
          ((updateToreplay = updateToReplayQueue.poll(1L,
          TimeUnit.SECONDS)) != null))
        {
          // Find replication domain for that update message
          UpdateMessage updateMsg = updateToreplay.getUpdateMessage();
          ReplicationDomain domain = updateToreplay.getReplicationDomain();
          domain.replay(updateMsg);
        }
      } catch (Exception e)
      {
        /*
         * catch all exceptions happening so that the thread never dies even
         * in case of problems.
         */
        Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get(
            stackTraceToSingleLineString(e));
        logError(message);
      }
    }
    done = true;
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Replay thread stopping.");
    }
  }
  /**
   * Wait for the completion of this thread.
   */
  public void waitForShutdown()
  {
    try
    {
      while (done == false)
      {
        Thread.sleep(50);
      }
    } catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -51,6 +51,7 @@
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
@@ -172,18 +173,16 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * on shutdown, the server will wait for existing threads to stop
   * during this timeout (in ms).
   */
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  private ReplicationMonitor monitor;
  private ReplicationBroker broker;
  private List<ListenerThread> synchroThreads =
    new ArrayList<ListenerThread>();
  // Thread waiting for incoming update messages for this domain and pushing
  // them to the global incoming update message queue for later processing by
  // replay threads.
  private ListenerThread listenerThread;
  // The update to replay message queue where the listener thread is going to
  // push incoming update messages.
  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
  private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMessage>();
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
@@ -233,8 +232,6 @@
  // Null when none is being processed.
  private IEContext ieContext = null;
  private int listenerThreadNumber = 10;
  private Collection<String> replicationServers;
  private DN baseDN;
@@ -355,12 +352,59 @@
  }
  /**
   * This thread is launched when we want to export data to another server that
   * has requested to be initialized with the data of our backend.
   */
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will receive updates
    private short target;
    /**
     * Constructor for the ExportThread.
     *
     * @param target Id of server that will receive updates
     */
    public ExportThread(short target)
    {
      super("Export thread");
      this.target = target;
    }
    /**
     * Run method for this class.
     */
    public void run()
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread starting.");
      }
      try
      {
        initializeRemote(target, target, null);
      } catch (DirectoryException de)
      {
      // An error message has been sent to the peer
      // Nothing more to do locally
      }
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread stopping.");
      }
    }
  }
  /**
   * Creates a new ReplicationDomain using configuration from configEntry.
   *
   * @param configuration    The configuration of this ReplicationDomain.
   * @param updateToReplayQueue The queue for update messages to replay.
   * @throws ConfigException In case of invalid configuration.
   */
  public ReplicationDomain(ReplicationDomainCfg configuration)
  public ReplicationDomain(ReplicationDomainCfg configuration,
    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
    throws ConfigException
  {
    super("replicationDomain_" + configuration.getBaseDN());
@@ -373,6 +417,7 @@
    heartbeatInterval = configuration.getHeartbeatInterval();
    isolationpolicy = configuration.getIsolationPolicy();
    configDn = configuration.dn();
    this.updateToReplayQueue = updateToReplayQueue;
    /*
     * Modify conflicts are solved for all suffixes but the schema suffix
@@ -819,10 +864,8 @@
   */
  public UpdateMessage receive()
  {
    UpdateMessage update = remotePendingChanges.getNextUpdate();
    UpdateMessage update = null;
    if (update == null)
    {
      while (update == null)
      {
        InitializeRequestMessage initMsg = null;
@@ -919,21 +962,13 @@
        // when we are doing and export and so that a possible
        // closure of the socket happening when we are publishing the
        // entries to the remote can be handled by the other
        // ListenerThread when they call this method and therefore the
      // replay thread when they call this method and therefore the
        // broker.receive() method.
        if (initMsg != null)
        {
          try
          {
            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
                null);
          }
          catch(DirectoryException de)
          {
            // An error message has been sent to the peer
            // Nothing more to do locally
          }
        }
        // Do this work in a thread to allow replay thread continue working
        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
        exportThread.start();
      }
    }
    return update;
@@ -1190,10 +1225,9 @@
  @Override
  public void run()
  {
    /*
     * create the threads that will wait for incoming changes.
     */
    createListeners();
    // Create the listener thread
    listenerThread = new ListenerThread(this, updateToReplayQueue);
    listenerThread.start();
    while (shutdown  == false)
    {
@@ -1217,28 +1251,6 @@
  }
  /**
   * create the threads that will wait for incoming changes.
   * TODO : should use a pool of threads shared between all the servers
   * TODO : need to make number of thread configurable
   */
  private void createListeners()
  {
    synchronized (synchroThreads)
    {
      if (!shutdown)
      {
        synchroThreads.clear();
        for (int i=0; i<listenerThreadNumber; i++)
        {
          ListenerThread myThread = new ListenerThread(this);
          myThread.start();
          synchroThreads.add(myThread);
        }
      }
    }
  }
  /**
   * Shutdown this ReplicationDomain.
   */
  public void shutdown()
@@ -1246,14 +1258,8 @@
    // stop the flush thread
    shutdown = true;
    synchronized (synchroThreads)
    {
      // stop the listener threads
      for (ListenerThread thread : synchroThreads)
      {
        thread.shutdown();
      }
    }
    // Stop the listener thread
    listenerThread.shutdown();
    synchronized (this)
    {
@@ -1267,11 +1273,8 @@
    // stop the ReplicationBroker
    broker.stop();
    //  wait for the listener thread to stop
    for (ListenerThread thread : synchroThreads)
    {
      thread.waitForShutdown();
    }
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
    // wait for completion of the persistentServerState thread.
    try
@@ -1315,6 +1318,10 @@
    int retryCount = 10;
    boolean firstTry = true;
    // Try replay the operation, then flush (replaying) any pending operation
    // whose dependency has been replayed until no more left.
    do
    {
    try
    {
      while ((!dependency) && (!done) && (retryCount-- > 0))
@@ -1326,6 +1333,7 @@
        changeNumber = OperationContext.getChangeNumber(op);
        ((AbstractOperation)op).run();
          // Try replay the operation
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
@@ -1338,8 +1346,7 @@
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof DeleteOperation)
            } else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            dependency = remotePendingChanges.checkDependencies(newOp);
@@ -1347,8 +1354,7 @@
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof AddOperation)
            } else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            AddMsg addMsg = (AddMsg) msg;
@@ -1357,8 +1363,7 @@
            {
              done = solveNamingConflict(newOp, addMsg);
            }
          }
          else if (op instanceof ModifyDNOperationBasis)
            } else if (op instanceof ModifyDNOperationBasis)
          {
            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
            dependency = remotePendingChanges.checkDependencies(newMsg);
@@ -1367,8 +1372,7 @@
              ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
              done = solveNamingConflict(newOp, msg);
            }
          }
          else
            } else
          {
            done = true;  // unknown type of operation ?!
          }
@@ -1379,8 +1383,7 @@
            // however we still need to push this change to the serverState
            updateError(changeNumber);
          }
        }
        else
          } else
        {
          done = true;
        }
@@ -1399,26 +1402,22 @@
        updateError(changeNumber);
      }
    }
    catch (ASN1Exception e)
      } catch (ASN1Exception e)
    {
      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
              String.valueOf(msg) + stackTraceToSingleLineString(e));
      logError(message);
    }
    catch (LDAPException e)
      } catch (LDAPException e)
    {
      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
              String.valueOf(msg) + stackTraceToSingleLineString(e));
      logError(message);
    }
    catch (DataFormatException e)
      } catch (DataFormatException e)
    {
      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
              String.valueOf(msg) + stackTraceToSingleLineString(e));
      logError(message);
    }
    catch (Exception e)
      } catch (Exception e)
    {
      if (changeNumber != null)
      {
@@ -1432,15 +1431,13 @@
            stackTraceToSingleLineString(e), op.toString());
        logError(message);
        updateError(changeNumber);
      }
      else
        } else
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
                String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
      }
    }
    finally
      } finally
    {
      if (!dependency)
      {
@@ -1449,6 +1446,20 @@
        incProcessedUpdates();
      }
    }
      // Now replay any pending update that had a dependency and whose
      // dependency has been replayed, do that until no more updates of that
      // type left...
      msg = remotePendingChanges.getNextUpdate();
      // Prepare restart of loop
      done = false;
      dependency = false;
      changeNumber = null;
      retryCount = 10;
      firstTry = true;
    } while (msg != null);
  }
  /**
@@ -2224,7 +2235,7 @@
   * The session to the replication server will be stopped.
   * The domain will not be destroyed but call to the pre-operation
   * methods will result in failure.
   * The listener threads will be destroyed.
   * The listener thread will be destroyed.
   * The monitor informations will still be accessible.
   */
  public void disable()
@@ -2232,23 +2243,14 @@
    state.save();
    state.clearInMemory();
    disabled = true;
    //  stop the listener threads
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
    }
    broker.stop(); // this will cut the session and wake-up the listeners
    for (ListenerThread thread : synchroThreads)
    {
      try
      {
        thread.join(SHUTDOWN_JOIN_TIMEOUT);
      } catch (InterruptedException e)
      {
        // ignore
      }
    }
    // Stop the listener thread
    listenerThread.shutdown();
    broker.stop(); // This will cut the session and wake up the listener
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
  }
  /**
@@ -2265,7 +2267,6 @@
    state.loadState();
    disabled = false;
    try
    {
      generationId = loadGenerationId();
@@ -2288,7 +2289,9 @@
    broker.start(replicationServers);
    createListeners();
    // Create the listener thread
    listenerThread = new ListenerThread(this, updateToReplayQueue);
    listenerThread.start();
  }
  /**
opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java
New file
@@ -0,0 +1,73 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.server.replication.protocol.UpdateMessage;
/**
 * This is a bag class to hold an update to replay in the queue of updates to
 * be replayed by the replay threads.
 * It associates an update message to replay with the matching
 * ReplicationDomain.
 */
public class UpdateToReplay
{
  private UpdateMessage updateMessage = null;
  private ReplicationDomain replicationDomain = null;
  /**
   * Construct the object associating the update message with the replication
   * domain that must be used to replay it (the on it comes from).
   * @param updateMessage The update message
   * @param replicationDomain The replication domain to use for replaying the
   * change from the update message
   */
  public UpdateToReplay(UpdateMessage updateMessage,
    ReplicationDomain replicationDomain)
  {
    this.updateMessage = updateMessage;
    this.replicationDomain = replicationDomain;
  }
  /**
   * Getter for update message.
   * @return The update message
   */
  public UpdateMessage getUpdateMessage()
  {
    return updateMessage;
  }
  /**
   * Getter for replication domain.
   * @return The replication domain
   */
  public ReplicationDomain getReplicationDomain()
  {
    return replicationDomain;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -420,7 +420,7 @@
   * To increase the risks of failures a loop of add/del/add is done.
   */
  @SuppressWarnings("unchecked")
  @Test(enabled=false, groups="slow")
  @Test(enabled=true, groups="slow")
  public void addDelAddDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
@@ -552,7 +552,7 @@
   * issuing a set of Add operation followed by a modrdn of the added entry.
   */
  @SuppressWarnings("unchecked")
  @Test(enabled=false, groups="slow")
  @Test(enabled=true, groups="slow")
  public void addModdnDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;