From eb1275dc2c85f9e3db21bfd5a151d4bc0740d336 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 18 Jan 2008 10:18:24 +0000
Subject: [PATCH] Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn

---
 opends/resource/schema/02-config.ldif                                                                 |   15 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java           |    4 
 opends/src/server/org/opends/server/replication/plugin/ListenerThread.java                            |   56 ++
 opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java                    |  114 ++++++
 opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java                      |    2 
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java                         |  565 ++++++++++++++++----------------
 opends/src/messages/messages/replication.properties                                                   |    2 
 opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml |   26 +
 opends/src/server/org/opends/server/replication/plugin/ReplayThread.java                              |  142 ++++++++
 opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java                            |   73 ++++
 10 files changed, 692 insertions(+), 307 deletions(-)

diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index e818a99..8bf2a14 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -1543,11 +1543,6 @@
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
   SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445
-  NAME 'ds-cfg-cache-level'
-  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
-  SINGLE-VALUE
-  X-ORIGIN 'OpenDS Directory Server' )
 attributeTypes: ( 1.3.6.1.4.1.26027.1.1.315
   NAME 'ds-cfg-allow-retrieving-membership'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
@@ -2182,11 +2177,20 @@
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
   SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445
+  NAME 'ds-cfg-cache-level'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 attributeTypes: ( 1.3.6.1.4.1.26027.1.1.446
   NAME 'ds-cfg-cache-preload'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
   SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.447
+  NAME 'ds-cfg-num-update-replay-threads'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler'
   SUP top
@@ -3087,6 +3091,7 @@
   NAME 'ds-cfg-replication-synchronization-provider'
   SUP ds-cfg-synchronization-provider
   STRUCTURAL
+  MAY ( ds-cfg-num-update-replay-threads )
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.94
   NAME 'ds-cfg-dictionary-password-validator'
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
index 330ed53..41258c9 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@
   ! CDDL HEADER END
   !
   !
-  !      Portions Copyright 2007 Sun Microsystems, Inc.
+  !      Portions Copyright 2007-2008 Sun Microsystems, Inc.
   ! -->
 <adm:managed-object name="replication-synchronization-provider"
   plural-name="replication-synchronization-providers"
@@ -79,4 +79,28 @@
       </adm:defined>
     </adm:default-behavior>
   </adm:property-override>
+  <adm:property name="num-update-replay-threads" mandatory="false" read-only="false">
+    <adm:synopsis>
+      Specifies the number of update replay threads
+    </adm:synopsis>
+    <adm:description>
+      This is the number of threads created for replaying every updates
+      received for all the replication domains.
+    </adm:description>
+    <adm:default-behavior>
+      <adm:defined>
+        <adm:value>
+          10
+        </adm:value>
+      </adm:defined>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:integer lower-limit="1" upper-limit="65535"></adm:integer>
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-num-update-replay-threads</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
 </adm:managed-object>
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index d47fa34..dba9531 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -257,4 +257,6 @@
  are missing due to a processing error : %s
 SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
  sending request to get remote monitor data
+SEVERE_ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE_109=An Exception was caught \
+ while replaying replication message : %s
 
diff --git a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
index 9be5ae9..d92eb19 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -22,9 +22,11 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import org.opends.messages.Message;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
@@ -48,22 +50,27 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private ReplicationDomain listener;
+  private ReplicationDomain repDomain;
   private boolean shutdown = false;
   private boolean done = false;
+  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
 
 
   /**
    * Constructor for the ListenerThread.
    *
-   * @param listener the Plugin that created this thread
+   * @param repDomain the replication domain that created this thread
+   * @param updateToReplayQueue The update messages queue we must
+   * store messages in
    */
-  public ListenerThread(ReplicationDomain listener)
+  public ListenerThread(ReplicationDomain repDomain,
+    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
   {
      super("Replication Listener thread " +
-         "serverID=" + listener.serverId +
-         " domain=" + listener.getName());
-     this.listener = listener;
+         "serverID=" + repDomain.serverId +
+         " domain=" + repDomain.getName());
+     this.repDomain = repDomain;
+     this.updateToReplayQueue = updateToReplayQueue;
   }
 
   /**
@@ -77,31 +84,50 @@
   /**
    * Run method for this class.
    */
+  @Override
   public void run()
   {
-    UpdateMessage msg;
+    UpdateMessage updateMsg = null;
 
     if (debugEnabled())
     {
       TRACER.debugInfo("Replication Listener thread starting.");
     }
 
-    while (shutdown == false)
+    while (!shutdown)
     {
       try
       {
-        while (((msg = listener.receive()) != null) && (shutdown == false))
+        // Loop receiving update messages and puting them in the update message
+        // queue
+        while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
         {
-          listener.replay(msg);
+          // Put update message into the queue (block until some place in the
+          // queue is available)
+          UpdateToReplay updateToReplay =
+            new UpdateToReplay(updateMsg, repDomain);
+          boolean queued = false;
+          while (!queued && !shutdown)
+          {
+            // Use timedout method (offer) instead of put for being able to
+            // shutdown the thread
+            queued = updateToReplayQueue.offer(updateToReplay,
+              1L, TimeUnit.SECONDS);
+          }
+          if (!queued)
+          {
+            // Shutdown requested but could not push message: ensure this one is
+            // not lost and put it in the queue before dying
+            updateToReplayQueue.offer(updateToReplay);
+          }
         }
-        if (msg == null)
+        if (updateMsg == null)
           shutdown = true;
       } catch (Exception e)
       {
         /*
-         * catch all exceptions happening in listener.receive and
-         * listener.replay so that the thread never dies even in case
-         * of problems.
+         * catch all exceptions happening in repDomain.receive so that the
+         * thread never dies even in case of problems.
          */
         Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
             stackTraceToSingleLineString(e));
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 001484d..a69f1b4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
+import java.util.ArrayList;
 import static org.opends.server.replication.plugin.
 ReplicationRepairRequestControl.*;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationAddListener;
+import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.server.ConfigurationDeleteListener;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
@@ -82,6 +85,8 @@
        extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
        implements ConfigurationAddListener<ReplicationDomainCfg>,
                   ConfigurationDeleteListener<ReplicationDomainCfg>,
+                  ConfigurationChangeListener
+                  <ReplicationSynchronizationProviderCfg>,
                   BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                   ExportTaskListener
 {
@@ -89,6 +94,23 @@
   private static Map<DN, ReplicationDomain> domains =
     new HashMap<DN, ReplicationDomain>() ;
 
+  /**
+   * The queue of received update messages, to be treated by the ReplayThread
+   * threads.
+   */
+  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
+    new LinkedBlockingQueue<UpdateToReplay>();
+
+  /**
+   * The list of ReplayThread threads.
+   */
+  private static List<ReplayThread> replayThreads =
+    new ArrayList<ReplayThread>();
+
+  /**
+   * The configurable number of replay threads.
+   */
+  private static int replayThreadNumber = 10;
 
   /**
    * Finds the domain for a given DN.
@@ -167,7 +189,16 @@
       throws ConfigException
   {
     ReplicationDomain domain;
-    domain = new ReplicationDomain(configuration);
+    domain = new ReplicationDomain(configuration, updateToReplayQueue);
+
+    if (domains.size() == 0)
+    {
+      /*
+       * Create the threads that will process incoming update messages
+       */
+      createReplayThreads();
+    }
+
     domains.put(domain.getBaseDN(), domain);
     domain.start();
     return domain;
@@ -180,6 +211,12 @@
   public static void deleteDomain(DN dn)
   {
     ReplicationDomain domain = domains.remove(dn);
+
+    // No replay threads running if no replication need
+    if (domains.size() == 0) {
+      stopReplayThreads();
+    }
+
     if (domain != null)
       domain.shutdown();
   }
@@ -200,6 +237,12 @@
     configuration.addReplicationDomainAddListener(this);
     configuration.addReplicationDomainDeleteListener(this);
 
+    // Register as a root configuration listener so that we can be notified if
+    // number of replay threads is changed and apply changes.
+    configuration.addReplicationChangeListener(this);
+
+    replayThreadNumber = configuration.getNumUpdateReplayThreads();
+
     //  Create the list of domains that are already defined.
     for (String name : configuration.listReplicationDomains())
     {
@@ -228,6 +271,39 @@
   }
 
   /**
+   * Create the threads that will wait for incoming update messages.
+   */
+  private synchronized static void createReplayThreads()
+  {
+    replayThreads.clear();
+
+    for (int i = 0; i < replayThreadNumber; i++)
+    {
+      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
+      replayThread.start();
+      replayThreads.add(replayThread);
+    }
+  }
+
+  /**
+   * Stope the threads that are waiting for incoming update messages.
+   */
+  private synchronized static void stopReplayThreads()
+  {
+    //  stop the replay threads
+    for (ReplayThread replayThread : replayThreads)
+    {
+      replayThread.shutdown();
+    }
+
+    for (ReplayThread replayThread : replayThreads)
+    {
+      replayThread.waitForShutdown();
+    }
+    replayThreads.clear();
+  }
+
+  /**
    * {@inheritDoc}
    */
   public boolean isConfigurationAddAcceptable(
@@ -431,6 +507,9 @@
   @Override
   public void finalizeSynchronizationProvider()
   {
+    // Stop replay threads
+    stopReplayThreads();
+
     // shutdown all the domains
     for (ReplicationDomain domain : domains.values())
     {
@@ -621,6 +700,37 @@
   {
     return replicationServerListener;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean
+    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
+    configuration,
+    List<Message> unacceptableReasons)
+  {
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public ConfigChangeResult
+    applyConfigurationChange
+    (ReplicationSynchronizationProviderCfg configuration)
+  {
+    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
+
+    // Stop threads then restart new number of threads
+    stopReplayThreads();
+    replayThreadNumber = numUpdateRepayThread;
+    if (domains.size() > 0)
+    {
+      createReplayThreads();
+    }
+
+    return new ConfigChangeResult(ResultCode.SUCCESS, false);
+  }
 }
 
 
diff --git a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index 9327406..72056f9 100644
--- a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -158,7 +158,7 @@
   {
     /*
      * Parse the list of Update with dependencies and check if the dependencies
-     * are now cleared until an Update withour dependencies is found.
+     * are now cleared until an Update without dependencies is found.
      */
     for (PendingChange change : dependentChanges)
     {
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
new file mode 100644
index 0000000..3774933
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -0,0 +1,142 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.opends.messages.Message;
+
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.protocol.UpdateMessage;
+
+/**
+ * Thread that is used to get message from the replication servers (stored
+ * in the updates queue) and replay them in the current server. A configurable
+ * number of this thread is created for the whole MultimasterReplication object
+ * (i.e: these threads are shared accross the ReplicationDomain objects for
+ * replaying the updates they receive)
+ */
+public class ReplayThread extends DirectoryThread
+{
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
+  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
+  private boolean shutdown = false;
+  private boolean done = false;
+
+  /**
+   * Constructor for the ReplayThread.
+   *
+   * @param updateToReplayQueue The queue of update messages we have to replay
+   */
+  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
+  {
+     super("Replication Replay thread");
+     this.updateToReplayQueue = updateToReplayQueue;
+  }
+
+  /**
+   * Shutdown this replay thread.
+   */
+  public void shutdown()
+  {
+    shutdown = true;
+  }
+
+  /**
+   * Run method for this class.
+   */
+  @Override
+  public void run()
+  {
+
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("Replication Replay thread starting.");
+    }
+
+    UpdateToReplay updateToreplay = null;
+
+    while (!shutdown)
+    {
+      try
+      {
+        // Loop getting an updateToReplayQueue from the update message queue and
+        // replaying matching changes
+        while ( (!shutdown) &&
+          ((updateToreplay = updateToReplayQueue.poll(1L,
+          TimeUnit.SECONDS)) != null))
+        {
+          // Find replication domain for that update message
+          UpdateMessage updateMsg = updateToreplay.getUpdateMessage();
+          ReplicationDomain domain = updateToreplay.getReplicationDomain();
+          domain.replay(updateMsg);
+        }
+      } catch (Exception e)
+      {
+        /*
+         * catch all exceptions happening so that the thread never dies even
+         * in case of problems.
+         */
+        Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get(
+            stackTraceToSingleLineString(e));
+        logError(message);
+      }
+    }
+    done = true;
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("Replication Replay thread stopping.");
+    }
+  }
+
+  /**
+   * Wait for the completion of this thread.
+   */
+  public void waitForShutdown()
+  {
+    try
+    {
+      while (done == false)
+      {
+        Thread.sleep(50);
+      }
+    } catch (InterruptedException e)
+    {
+      // exit the loop if this thread is interrupted.
+    }
+  }
+}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 391c238..8e4efd0 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 import static org.opends.messages.ReplicationMessages.*;
@@ -51,6 +51,7 @@
 import java.util.NoSuchElementException;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.Adler32;
 import java.util.zip.CheckedOutputStream;
@@ -172,18 +173,16 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  /**
-   * on shutdown, the server will wait for existing threads to stop
-   * during this timeout (in ms).
-   */
-  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
-
   private ReplicationMonitor monitor;
 
   private ReplicationBroker broker;
-
-  private List<ListenerThread> synchroThreads =
-    new ArrayList<ListenerThread>();
+  // Thread waiting for incoming update messages for this domain and pushing
+  // them to the global incoming update message queue for later processing by
+  // replay threads.
+  private ListenerThread listenerThread;
+  // The update to replay message queue where the listener thread is going to
+  // push incoming update messages.
+  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
   private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
     new TreeMap<ChangeNumber, UpdateMessage>();
   private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
@@ -233,8 +232,6 @@
   // Null when none is being processed.
   private IEContext ieContext = null;
 
-  private int listenerThreadNumber = 10;
-
   private Collection<String> replicationServers;
 
   private DN baseDN;
@@ -355,12 +352,59 @@
   }
 
   /**
+   * This thread is launched when we want to export data to another server that
+   * has requested to be initialized with the data of our backend.
+   */
+  private class ExportThread extends DirectoryThread
+  {
+    // Id of server that will receive updates
+    private short target;
+
+    /**
+     * Constructor for the ExportThread.
+     *
+     * @param target Id of server that will receive updates
+     */
+    public ExportThread(short target)
+    {
+      super("Export thread");
+      this.target = target;
+    }
+
+    /**
+     * Run method for this class.
+     */
+    public void run()
+    {
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Export thread starting.");
+      }
+
+      try
+      {
+        initializeRemote(target, target, null);
+      } catch (DirectoryException de)
+      {
+      // An error message has been sent to the peer
+      // Nothing more to do locally
+      }
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Export thread stopping.");
+      }
+    }
+  }
+
+  /**
    * Creates a new ReplicationDomain using configuration from configEntry.
    *
    * @param configuration    The configuration of this ReplicationDomain.
+   * @param updateToReplayQueue The queue for update messages to replay.
    * @throws ConfigException In case of invalid configuration.
    */
-  public ReplicationDomain(ReplicationDomainCfg configuration)
+  public ReplicationDomain(ReplicationDomainCfg configuration,
+    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
     throws ConfigException
   {
     super("replicationDomain_" + configuration.getBaseDN());
@@ -373,6 +417,7 @@
     heartbeatInterval = configuration.getHeartbeatInterval();
     isolationpolicy = configuration.getIsolationPolicy();
     configDn = configuration.dn();
+    this.updateToReplayQueue = updateToReplayQueue;
 
     /*
      * Modify conflicts are solved for all suffixes but the schema suffix
@@ -819,122 +864,112 @@
    */
   public UpdateMessage receive()
   {
-    UpdateMessage update = remotePendingChanges.getNextUpdate();
+    UpdateMessage update = null;
 
-    if (update == null)
+    while (update == null)
     {
-      while (update == null)
+      InitializeRequestMessage initMsg = null;
+      synchronized (broker)
       {
-        InitializeRequestMessage initMsg = null;
-        synchronized (broker)
+        ReplicationMessage msg;
+        try
         {
-          ReplicationMessage msg;
-          try
+          msg = broker.receive();
+          if (msg == null)
           {
-            msg = broker.receive();
-            if (msg == null)
-            {
-              // The server is in the shutdown process
-              return null;
-            }
+            // The server is in the shutdown process
+            return null;
+          }
 
-            if (debugEnabled())
-              if (!(msg instanceof HeartbeatMessage))
-                TRACER.debugVerbose("Message received <" + msg + ">");
+          if (debugEnabled())
+            if (!(msg instanceof HeartbeatMessage))
+              TRACER.debugVerbose("Message received <" + msg + ">");
 
-            if (msg instanceof AckMessage)
-            {
-              AckMessage ack = (AckMessage) msg;
-              receiveAck(ack);
+          if (msg instanceof AckMessage)
+          {
+            AckMessage ack = (AckMessage) msg;
+            receiveAck(ack);
             }
             else if (msg instanceof InitializeRequestMessage)
-            {
-              // Another server requests us to provide entries
-              // for a total update
+          {
+            // Another server requests us to provide entries
+            // for a total update
               initMsg = (InitializeRequestMessage)msg;
             }
             else if (msg instanceof InitializeTargetMessage)
-            {
-              // Another server is exporting its entries to us
-              InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+          {
+            // Another server is exporting its entries to us
+            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
 
-              try
-              {
-                // This must be done while we are still holding the
-                // broker lock because we are now going to receive a
-                // bunch of entries from the remote server and we
-                // want the import thread to catch them and
-                // not the ListenerThread.
-                initialize(importMsg);
+            try
+            {
+              // This must be done while we are still holding the
+              // broker lock because we are now going to receive a
+              // bunch of entries from the remote server and we
+              // want the import thread to catch them and
+              // not the ListenerThread.
+              initialize(importMsg);
               }
               catch(DirectoryException de)
-              {
-                // Returns an error message to notify the sender
-                ErrorMessage errorMsg =
-                  new ErrorMessage(importMsg.getsenderID(),
-                                   de.getMessageObject());
-                MessageBuilder mb = new MessageBuilder();
-                mb.append(de.getMessageObject());
-                TRACER.debugInfo(Message.toString(mb.toMessage()));
-                broker.publish(errorMsg);
-              }
+            {
+              // Returns an error message to notify the sender
+              ErrorMessage errorMsg =
+                new ErrorMessage(importMsg.getsenderID(),
+                de.getMessageObject());
+              MessageBuilder mb = new MessageBuilder();
+              mb.append(de.getMessageObject());
+              TRACER.debugInfo(Message.toString(mb.toMessage()));
+              broker.publish(errorMsg);
+            }
             }
             else if (msg instanceof ErrorMessage)
+          {
+            if (ieContext != null)
             {
-              if (ieContext != null)
-              {
-                // This is an error termination for the 2 following cases :
-                // - either during an export
-                // - or before an import really started
-                //   For example, when we publish a request and the
-                //  replicationServer did not find any import source.
+              // This is an error termination for the 2 following cases :
+              // - either during an export
+              // - or before an import really started
+              //   For example, when we publish a request and the
+              //  replicationServer did not find any import source.
                 abandonImportExport((ErrorMessage)msg);
               }
               else
-              {
-                /* We can receive an error message from the replication server
-                 * in the following cases :
-                 * - we connected with an incorrect generation id
-                 */
+            {
+              /* We can receive an error message from the replication server
+               * in the following cases :
+               * - we connected with an incorrect generation id
+               */
                 ErrorMessage errorMsg = (ErrorMessage)msg;
-                logError(ERR_ERROR_MSG_RECEIVED.get(
-                           errorMsg.getDetails()));
-              }
+              logError(ERR_ERROR_MSG_RECEIVED.get(
+                errorMsg.getDetails()));
+            }
             }
             else if (msg instanceof UpdateMessage)
-            {
-              update = (UpdateMessage) msg;
-              receiveUpdate(update);
-            }
+          {
+            update = (UpdateMessage) msg;
+            receiveUpdate(update);
+          }
           }
           catch (SocketTimeoutException e)
-          {
-            // just retry
-          }
-        }
-        // Test if we have received and export request message and
-        // if that's the case handle it now.
-        // This must be done outside of the portion of code protected
-        // by the broker lock so that we keep receiveing update
-        // when we are doing and export and so that a possible
-        // closure of the socket happening when we are publishing the
-        // entries to the remote can be handled by the other
-        // ListenerThread when they call this method and therefore the
-        // broker.receive() method.
-        if (initMsg != null)
         {
-          try
-          {
-            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
-                null);
-          }
-          catch(DirectoryException de)
-          {
-            // An error message has been sent to the peer
-            // Nothing more to do locally
-          }
+        // just retry
         }
       }
+      // Test if we have received and export request message and
+      // if that's the case handle it now.
+      // This must be done outside of the portion of code protected
+      // by the broker lock so that we keep receiveing update
+      // when we are doing and export and so that a possible
+      // closure of the socket happening when we are publishing the
+      // entries to the remote can be handled by the other
+      // replay thread when they call this method and therefore the
+      // broker.receive() method.
+      if (initMsg != null)
+      {
+        // Do this work in a thread to allow replay thread continue working
+        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
+        exportThread.start();
+      }
     }
     return update;
   }
@@ -1190,10 +1225,9 @@
   @Override
   public void run()
   {
-    /*
-     * create the threads that will wait for incoming changes.
-     */
-    createListeners();
+    // Create the listener thread
+    listenerThread = new ListenerThread(this, updateToReplayQueue);
+    listenerThread.start();
 
     while (shutdown  == false)
     {
@@ -1217,28 +1251,6 @@
   }
 
   /**
-   * create the threads that will wait for incoming changes.
-   * TODO : should use a pool of threads shared between all the servers
-   * TODO : need to make number of thread configurable
-   */
-  private void createListeners()
-  {
-    synchronized (synchroThreads)
-    {
-      if (!shutdown)
-      {
-        synchroThreads.clear();
-        for (int i=0; i<listenerThreadNumber; i++)
-        {
-          ListenerThread myThread = new ListenerThread(this);
-          myThread.start();
-          synchroThreads.add(myThread);
-        }
-      }
-    }
-  }
-
-  /**
    * Shutdown this ReplicationDomain.
    */
   public void shutdown()
@@ -1246,14 +1258,8 @@
     // stop the flush thread
     shutdown = true;
 
-    synchronized (synchroThreads)
-    {
-      // stop the listener threads
-      for (ListenerThread thread : synchroThreads)
-      {
-        thread.shutdown();
-      }
-    }
+    // Stop the listener thread
+    listenerThread.shutdown();
 
     synchronized (this)
     {
@@ -1267,11 +1273,8 @@
     // stop the ReplicationBroker
     broker.stop();
 
-    //  wait for the listener thread to stop
-    for (ListenerThread thread : synchroThreads)
-    {
-      thread.waitForShutdown();
-    }
+    // Wait for the listener thread to stop
+    listenerThread.waitForShutdown();
 
     // wait for completion of the persistentServerState thread.
     try
@@ -1315,140 +1318,148 @@
     int retryCount = 10;
     boolean firstTry = true;
 
-    try
+    // Try replay the operation, then flush (replaying) any pending operation
+    // whose dependency has been replayed until no more left.
+    do
     {
-      while ((!dependency) && (!done) && (retryCount-- > 0))
+      try
       {
-        op = msg.createOperation(conn);
-
-        op.setInternalOperation(true);
-        op.setSynchronizationOperation(true);
-        changeNumber = OperationContext.getChangeNumber(op);
-        ((AbstractOperation)op).run();
-
-        ResultCode result = op.getResultCode();
-
-        if (result != ResultCode.SUCCESS)
+        while ((!dependency) && (!done) && (retryCount-- > 0))
         {
-          if (op instanceof ModifyOperation)
-          {
-            ModifyOperation newOp = (ModifyOperation) op;
-            dependency = remotePendingChanges.checkDependencies(newOp);
-            if (!dependency)
-            {
-              done = solveNamingConflict(newOp, msg);
-            }
-          }
-          else if (op instanceof DeleteOperation)
-          {
-            DeleteOperation newOp = (DeleteOperation) op;
-            dependency = remotePendingChanges.checkDependencies(newOp);
-            if ((!dependency) && (!firstTry))
-            {
-              done = solveNamingConflict(newOp, msg);
-            }
-          }
-          else if (op instanceof AddOperation)
-          {
-            AddOperation newOp = (AddOperation) op;
-            AddMsg addMsg = (AddMsg) msg;
-            dependency = remotePendingChanges.checkDependencies(newOp);
-            if (!dependency)
-            {
-              done = solveNamingConflict(newOp, addMsg);
-            }
-          }
-          else if (op instanceof ModifyDNOperationBasis)
-          {
-            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
-            dependency = remotePendingChanges.checkDependencies(newMsg);
-            if (!dependency)
-            {
-              ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
-              done = solveNamingConflict(newOp, msg);
-            }
-          }
-          else
-          {
-            done = true;  // unknown type of operation ?!
-          }
-          if (done)
-          {
-            // the update became a dummy update and the result
-            // of the conflict resolution phase is to do nothing.
-            // however we still need to push this change to the serverState
-            updateError(changeNumber);
-          }
-        }
-        else
-        {
-          done = true;
-        }
-        firstTry = false;
-      }
+          op = msg.createOperation(conn);
 
-      if (!done && !dependency)
-      {
-        // Continue with the next change but the servers could now become
-        // inconsistent.
-        // Let the repair tool know about this.
-        Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
+          op.setInternalOperation(true);
+          op.setSynchronizationOperation(true);
+          changeNumber = OperationContext.getChangeNumber(op);
+          ((AbstractOperation) op).run();
+
+          // Try replay the operation
+          ResultCode result = op.getResultCode();
+
+          if (result != ResultCode.SUCCESS)
+          {
+            if (op instanceof ModifyOperation)
+            {
+              ModifyOperation newOp = (ModifyOperation) op;
+              dependency = remotePendingChanges.checkDependencies(newOp);
+              if (!dependency)
+              {
+                done = solveNamingConflict(newOp, msg);
+              }
+            } else if (op instanceof DeleteOperation)
+            {
+              DeleteOperation newOp = (DeleteOperation) op;
+              dependency = remotePendingChanges.checkDependencies(newOp);
+              if ((!dependency) && (!firstTry))
+              {
+                done = solveNamingConflict(newOp, msg);
+              }
+            } else if (op instanceof AddOperation)
+            {
+              AddOperation newOp = (AddOperation) op;
+              AddMsg addMsg = (AddMsg) msg;
+              dependency = remotePendingChanges.checkDependencies(newOp);
+              if (!dependency)
+              {
+                done = solveNamingConflict(newOp, addMsg);
+              }
+            } else if (op instanceof ModifyDNOperationBasis)
+            {
+              ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+              dependency = remotePendingChanges.checkDependencies(newMsg);
+              if (!dependency)
+              {
+                ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
+                done = solveNamingConflict(newOp, msg);
+              }
+            } else
+            {
+              done = true;  // unknown type of operation ?!
+            }
+            if (done)
+            {
+              // the update became a dummy update and the result
+              // of the conflict resolution phase is to do nothing.
+              // however we still need to push this change to the serverState
+              updateError(changeNumber);
+            }
+          } else
+          {
+            done = true;
+          }
+          firstTry = false;
+        }
+
+        if (!done && !dependency)
+        {
+          // Continue with the next change but the servers could now become
+          // inconsistent.
+          // Let the repair tool know about this.
+          Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
             op.getErrorMessage().toString());
-        logError(message);
-        numUnresolvedNamingConflicts.incrementAndGet();
+          logError(message);
+          numUnresolvedNamingConflicts.incrementAndGet();
 
-        updateError(changeNumber);
-      }
-    }
-    catch (ASN1Exception e)
-    {
-      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-              String.valueOf(msg) + stackTraceToSingleLineString(e));
-      logError(message);
-    }
-    catch (LDAPException e)
-    {
-      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-              String.valueOf(msg) + stackTraceToSingleLineString(e));
-      logError(message);
-    }
-    catch (DataFormatException e)
-    {
-      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-              String.valueOf(msg) + stackTraceToSingleLineString(e));
-      logError(message);
-    }
-    catch (Exception e)
-    {
-      if (changeNumber != null)
-      {
-        /*
-         * An Exception happened during the replay process.
-         * Continue with the next change but the servers will now start
-         * to be inconsistent.
-         * Let the repair tool know about this.
-         */
-        Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
-            stackTraceToSingleLineString(e), op.toString());
-        logError(message);
-        updateError(changeNumber);
-      }
-      else
+          updateError(changeNumber);
+        }
+      } catch (ASN1Exception e)
       {
         Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-                String.valueOf(msg) + stackTraceToSingleLineString(e));
+          String.valueOf(msg) + stackTraceToSingleLineString(e));
         logError(message);
-      }
-    }
-    finally
-    {
-      if (!dependency)
+      } catch (LDAPException e)
       {
-        if (msg.isAssured())
-          ack(msg.getChangeNumber());
-        incProcessedUpdates();
+        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+          String.valueOf(msg) + stackTraceToSingleLineString(e));
+        logError(message);
+      } catch (DataFormatException e)
+      {
+        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+          String.valueOf(msg) + stackTraceToSingleLineString(e));
+        logError(message);
+      } catch (Exception e)
+      {
+        if (changeNumber != null)
+        {
+          /*
+           * An Exception happened during the replay process.
+           * Continue with the next change but the servers will now start
+           * to be inconsistent.
+           * Let the repair tool know about this.
+           */
+          Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
+            stackTraceToSingleLineString(e), op.toString());
+          logError(message);
+          updateError(changeNumber);
+        } else
+        {
+          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+            String.valueOf(msg) + stackTraceToSingleLineString(e));
+          logError(message);
+        }
+      } finally
+      {
+        if (!dependency)
+        {
+          if (msg.isAssured())
+            ack(msg.getChangeNumber());
+          incProcessedUpdates();
+        }
       }
-    }
+
+      // Now replay any pending update that had a dependency and whose
+      // dependency has been replayed, do that until no more updates of that
+      // type left...
+      msg = remotePendingChanges.getNextUpdate();
+
+      // Prepare restart of loop
+      done = false;
+      dependency = false;
+      changeNumber = null;
+      retryCount = 10;
+      firstTry = true;
+
+    } while (msg != null);
   }
 
   /**
@@ -2224,7 +2235,7 @@
    * The session to the replication server will be stopped.
    * The domain will not be destroyed but call to the pre-operation
    * methods will result in failure.
-   * The listener threads will be destroyed.
+   * The listener thread will be destroyed.
    * The monitor informations will still be accessible.
    */
   public void disable()
@@ -2232,23 +2243,14 @@
     state.save();
     state.clearInMemory();
     disabled = true;
-    //  stop the listener threads
-    for (ListenerThread thread : synchroThreads)
-    {
-      thread.shutdown();
-    }
-    broker.stop(); // this will cut the session and wake-up the listeners
 
-    for (ListenerThread thread : synchroThreads)
-    {
-      try
-      {
-        thread.join(SHUTDOWN_JOIN_TIMEOUT);
-      } catch (InterruptedException e)
-      {
-        // ignore
-      }
-    }
+    // Stop the listener thread
+    listenerThread.shutdown();
+
+    broker.stop(); // This will cut the session and wake up the listener
+
+    // Wait for the listener thread to stop
+    listenerThread.waitForShutdown();
   }
 
   /**
@@ -2265,7 +2267,6 @@
     state.loadState();
     disabled = false;
 
-
     try
     {
       generationId = loadGenerationId();
@@ -2288,7 +2289,9 @@
 
     broker.start(replicationServers);
 
-    createListeners();
+    // Create the listener thread
+    listenerThread = new ListenerThread(this, updateToReplayQueue);
+    listenerThread.start();
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java b/opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java
new file mode 100644
index 0000000..ff78d5a
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java
@@ -0,0 +1,73 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import org.opends.server.replication.protocol.UpdateMessage;
+
+/**
+ * This is a bag class to hold an update to replay in the queue of updates to
+ * be replayed by the replay threads.
+ * It associates an update message to replay with the matching
+ * ReplicationDomain.
+ */
+public class UpdateToReplay
+{
+  private UpdateMessage updateMessage = null;
+  private ReplicationDomain replicationDomain = null;
+
+  /**
+   * Construct the object associating the update message with the replication
+   * domain that must be used to replay it (the on it comes from).
+   * @param updateMessage The update message
+   * @param replicationDomain The replication domain to use for replaying the
+   * change from the update message
+   */
+  public UpdateToReplay(UpdateMessage updateMessage,
+    ReplicationDomain replicationDomain)
+  {
+    this.updateMessage = updateMessage;
+    this.replicationDomain = replicationDomain;
+  }
+
+  /**
+   * Getter for update message.
+   * @return The update message
+   */
+  public UpdateMessage getUpdateMessage()
+  {
+    return updateMessage;
+  }
+
+  /**
+   * Getter for replication domain.
+   * @return The replication domain
+   */
+  public ReplicationDomain getReplicationDomain()
+  {
+    return replicationDomain;
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
index be9671e..3f02ee0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -420,7 +420,7 @@
    * To increase the risks of failures a loop of add/del/add is done.
    */
   @SuppressWarnings("unchecked")
-  @Test(enabled=false, groups="slow")
+  @Test(enabled=true, groups="slow")
   public void addDelAddDependencyTest() throws Exception
   {
     ReplicationServer replServer = null;
@@ -552,7 +552,7 @@
    * issuing a set of Add operation followed by a modrdn of the added entry.
    */
   @SuppressWarnings("unchecked")
-  @Test(enabled=false, groups="slow")
+  @Test(enabled=true, groups="slow")
   public void addModdnDependencyTest() throws Exception
   {
     ReplicationServer replServer = null;

--
Gitblit v1.10.0