From eb1275dc2c85f9e3db21bfd5a151d4bc0740d336 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 18 Jan 2008 10:18:24 +0000
Subject: [PATCH] Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |  565 ++++++++++++++++++++++++++++----------------------------
 1 files changed, 284 insertions(+), 281 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 391c238..8e4efd0 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 import static org.opends.messages.ReplicationMessages.*;
@@ -51,6 +51,7 @@
 import java.util.NoSuchElementException;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.Adler32;
 import java.util.zip.CheckedOutputStream;
@@ -172,18 +173,16 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  /**
-   * on shutdown, the server will wait for existing threads to stop
-   * during this timeout (in ms).
-   */
-  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
-
   private ReplicationMonitor monitor;
 
   private ReplicationBroker broker;
-
-  private List<ListenerThread> synchroThreads =
-    new ArrayList<ListenerThread>();
+  // Thread waiting for incoming update messages for this domain and pushing
+  // them to the global incoming update message queue for later processing by
+  // replay threads.
+  private ListenerThread listenerThread;
+  // The update to replay message queue where the listener thread is going to
+  // push incoming update messages.
+  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
   private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
     new TreeMap<ChangeNumber, UpdateMessage>();
   private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
@@ -233,8 +232,6 @@
   // Null when none is being processed.
   private IEContext ieContext = null;
 
-  private int listenerThreadNumber = 10;
-
   private Collection<String> replicationServers;
 
   private DN baseDN;
@@ -355,12 +352,59 @@
   }
 
   /**
+   * This thread is launched when we want to export data to another server that
+   * has requested to be initialized with the data of our backend.
+   */
+  private class ExportThread extends DirectoryThread
+  {
+    // Id of server that will receive updates
+    private short target;
+
+    /**
+     * Constructor for the ExportThread.
+     *
+     * @param target Id of server that will receive updates
+     */
+    public ExportThread(short target)
+    {
+      super("Export thread");
+      this.target = target;
+    }
+
+    /**
+     * Run method for this class.
+     */
+    public void run()
+    {
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Export thread starting.");
+      }
+
+      try
+      {
+        initializeRemote(target, target, null);
+      } catch (DirectoryException de)
+      {
+      // An error message has been sent to the peer
+      // Nothing more to do locally
+      }
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Export thread stopping.");
+      }
+    }
+  }
+
+  /**
    * Creates a new ReplicationDomain using configuration from configEntry.
    *
    * @param configuration    The configuration of this ReplicationDomain.
+   * @param updateToReplayQueue The queue for update messages to replay.
    * @throws ConfigException In case of invalid configuration.
    */
-  public ReplicationDomain(ReplicationDomainCfg configuration)
+  public ReplicationDomain(ReplicationDomainCfg configuration,
+    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
     throws ConfigException
   {
     super("replicationDomain_" + configuration.getBaseDN());
@@ -373,6 +417,7 @@
     heartbeatInterval = configuration.getHeartbeatInterval();
     isolationpolicy = configuration.getIsolationPolicy();
     configDn = configuration.dn();
+    this.updateToReplayQueue = updateToReplayQueue;
 
     /*
      * Modify conflicts are solved for all suffixes but the schema suffix
@@ -819,122 +864,112 @@
    */
   public UpdateMessage receive()
   {
-    UpdateMessage update = remotePendingChanges.getNextUpdate();
+    UpdateMessage update = null;
 
-    if (update == null)
+    while (update == null)
     {
-      while (update == null)
+      InitializeRequestMessage initMsg = null;
+      synchronized (broker)
       {
-        InitializeRequestMessage initMsg = null;
-        synchronized (broker)
+        ReplicationMessage msg;
+        try
         {
-          ReplicationMessage msg;
-          try
+          msg = broker.receive();
+          if (msg == null)
           {
-            msg = broker.receive();
-            if (msg == null)
-            {
-              // The server is in the shutdown process
-              return null;
-            }
+            // The server is in the shutdown process
+            return null;
+          }
 
-            if (debugEnabled())
-              if (!(msg instanceof HeartbeatMessage))
-                TRACER.debugVerbose("Message received <" + msg + ">");
+          if (debugEnabled())
+            if (!(msg instanceof HeartbeatMessage))
+              TRACER.debugVerbose("Message received <" + msg + ">");
 
-            if (msg instanceof AckMessage)
-            {
-              AckMessage ack = (AckMessage) msg;
-              receiveAck(ack);
+          if (msg instanceof AckMessage)
+          {
+            AckMessage ack = (AckMessage) msg;
+            receiveAck(ack);
             }
             else if (msg instanceof InitializeRequestMessage)
-            {
-              // Another server requests us to provide entries
-              // for a total update
+          {
+            // Another server requests us to provide entries
+            // for a total update
               initMsg = (InitializeRequestMessage)msg;
             }
             else if (msg instanceof InitializeTargetMessage)
-            {
-              // Another server is exporting its entries to us
-              InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+          {
+            // Another server is exporting its entries to us
+            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
 
-              try
-              {
-                // This must be done while we are still holding the
-                // broker lock because we are now going to receive a
-                // bunch of entries from the remote server and we
-                // want the import thread to catch them and
-                // not the ListenerThread.
-                initialize(importMsg);
+            try
+            {
+              // This must be done while we are still holding the
+              // broker lock because we are now going to receive a
+              // bunch of entries from the remote server and we
+              // want the import thread to catch them and
+              // not the ListenerThread.
+              initialize(importMsg);
               }
               catch(DirectoryException de)
-              {
-                // Returns an error message to notify the sender
-                ErrorMessage errorMsg =
-                  new ErrorMessage(importMsg.getsenderID(),
-                                   de.getMessageObject());
-                MessageBuilder mb = new MessageBuilder();
-                mb.append(de.getMessageObject());
-                TRACER.debugInfo(Message.toString(mb.toMessage()));
-                broker.publish(errorMsg);
-              }
+            {
+              // Returns an error message to notify the sender
+              ErrorMessage errorMsg =
+                new ErrorMessage(importMsg.getsenderID(),
+                de.getMessageObject());
+              MessageBuilder mb = new MessageBuilder();
+              mb.append(de.getMessageObject());
+              TRACER.debugInfo(Message.toString(mb.toMessage()));
+              broker.publish(errorMsg);
+            }
             }
             else if (msg instanceof ErrorMessage)
+          {
+            if (ieContext != null)
             {
-              if (ieContext != null)
-              {
-                // This is an error termination for the 2 following cases :
-                // - either during an export
-                // - or before an import really started
-                //   For example, when we publish a request and the
-                //  replicationServer did not find any import source.
+              // This is an error termination for the 2 following cases :
+              // - either during an export
+              // - or before an import really started
+              //   For example, when we publish a request and the
+              //  replicationServer did not find any import source.
                 abandonImportExport((ErrorMessage)msg);
               }
               else
-              {
-                /* We can receive an error message from the replication server
-                 * in the following cases :
-                 * - we connected with an incorrect generation id
-                 */
+            {
+              /* We can receive an error message from the replication server
+               * in the following cases :
+               * - we connected with an incorrect generation id
+               */
                 ErrorMessage errorMsg = (ErrorMessage)msg;
-                logError(ERR_ERROR_MSG_RECEIVED.get(
-                           errorMsg.getDetails()));
-              }
+              logError(ERR_ERROR_MSG_RECEIVED.get(
+                errorMsg.getDetails()));
+            }
             }
             else if (msg instanceof UpdateMessage)
-            {
-              update = (UpdateMessage) msg;
-              receiveUpdate(update);
-            }
+          {
+            update = (UpdateMessage) msg;
+            receiveUpdate(update);
+          }
           }
           catch (SocketTimeoutException e)
-          {
-            // just retry
-          }
-        }
-        // Test if we have received and export request message and
-        // if that's the case handle it now.
-        // This must be done outside of the portion of code protected
-        // by the broker lock so that we keep receiveing update
-        // when we are doing and export and so that a possible
-        // closure of the socket happening when we are publishing the
-        // entries to the remote can be handled by the other
-        // ListenerThread when they call this method and therefore the
-        // broker.receive() method.
-        if (initMsg != null)
         {
-          try
-          {
-            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
-                null);
-          }
-          catch(DirectoryException de)
-          {
-            // An error message has been sent to the peer
-            // Nothing more to do locally
-          }
+        // just retry
         }
       }
+      // Test if we have received and export request message and
+      // if that's the case handle it now.
+      // This must be done outside of the portion of code protected
+      // by the broker lock so that we keep receiveing update
+      // when we are doing and export and so that a possible
+      // closure of the socket happening when we are publishing the
+      // entries to the remote can be handled by the other
+      // replay thread when they call this method and therefore the
+      // broker.receive() method.
+      if (initMsg != null)
+      {
+        // Do this work in a thread to allow replay thread continue working
+        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
+        exportThread.start();
+      }
     }
     return update;
   }
@@ -1190,10 +1225,9 @@
   @Override
   public void run()
   {
-    /*
-     * create the threads that will wait for incoming changes.
-     */
-    createListeners();
+    // Create the listener thread
+    listenerThread = new ListenerThread(this, updateToReplayQueue);
+    listenerThread.start();
 
     while (shutdown  == false)
     {
@@ -1217,28 +1251,6 @@
   }
 
   /**
-   * create the threads that will wait for incoming changes.
-   * TODO : should use a pool of threads shared between all the servers
-   * TODO : need to make number of thread configurable
-   */
-  private void createListeners()
-  {
-    synchronized (synchroThreads)
-    {
-      if (!shutdown)
-      {
-        synchroThreads.clear();
-        for (int i=0; i<listenerThreadNumber; i++)
-        {
-          ListenerThread myThread = new ListenerThread(this);
-          myThread.start();
-          synchroThreads.add(myThread);
-        }
-      }
-    }
-  }
-
-  /**
    * Shutdown this ReplicationDomain.
    */
   public void shutdown()
@@ -1246,14 +1258,8 @@
     // stop the flush thread
     shutdown = true;
 
-    synchronized (synchroThreads)
-    {
-      // stop the listener threads
-      for (ListenerThread thread : synchroThreads)
-      {
-        thread.shutdown();
-      }
-    }
+    // Stop the listener thread
+    listenerThread.shutdown();
 
     synchronized (this)
     {
@@ -1267,11 +1273,8 @@
     // stop the ReplicationBroker
     broker.stop();
 
-    //  wait for the listener thread to stop
-    for (ListenerThread thread : synchroThreads)
-    {
-      thread.waitForShutdown();
-    }
+    // Wait for the listener thread to stop
+    listenerThread.waitForShutdown();
 
     // wait for completion of the persistentServerState thread.
     try
@@ -1315,140 +1318,148 @@
     int retryCount = 10;
     boolean firstTry = true;
 
-    try
+    // Try replay the operation, then flush (replaying) any pending operation
+    // whose dependency has been replayed until no more left.
+    do
     {
-      while ((!dependency) && (!done) && (retryCount-- > 0))
+      try
       {
-        op = msg.createOperation(conn);
-
-        op.setInternalOperation(true);
-        op.setSynchronizationOperation(true);
-        changeNumber = OperationContext.getChangeNumber(op);
-        ((AbstractOperation)op).run();
-
-        ResultCode result = op.getResultCode();
-
-        if (result != ResultCode.SUCCESS)
+        while ((!dependency) && (!done) && (retryCount-- > 0))
         {
-          if (op instanceof ModifyOperation)
-          {
-            ModifyOperation newOp = (ModifyOperation) op;
-            dependency = remotePendingChanges.checkDependencies(newOp);
-            if (!dependency)
-            {
-              done = solveNamingConflict(newOp, msg);
-            }
-          }
-          else if (op instanceof DeleteOperation)
-          {
-            DeleteOperation newOp = (DeleteOperation) op;
-            dependency = remotePendingChanges.checkDependencies(newOp);
-            if ((!dependency) && (!firstTry))
-            {
-              done = solveNamingConflict(newOp, msg);
-            }
-          }
-          else if (op instanceof AddOperation)
-          {
-            AddOperation newOp = (AddOperation) op;
-            AddMsg addMsg = (AddMsg) msg;
-            dependency = remotePendingChanges.checkDependencies(newOp);
-            if (!dependency)
-            {
-              done = solveNamingConflict(newOp, addMsg);
-            }
-          }
-          else if (op instanceof ModifyDNOperationBasis)
-          {
-            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
-            dependency = remotePendingChanges.checkDependencies(newMsg);
-            if (!dependency)
-            {
-              ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
-              done = solveNamingConflict(newOp, msg);
-            }
-          }
-          else
-          {
-            done = true;  // unknown type of operation ?!
-          }
-          if (done)
-          {
-            // the update became a dummy update and the result
-            // of the conflict resolution phase is to do nothing.
-            // however we still need to push this change to the serverState
-            updateError(changeNumber);
-          }
-        }
-        else
-        {
-          done = true;
-        }
-        firstTry = false;
-      }
+          op = msg.createOperation(conn);
 
-      if (!done && !dependency)
-      {
-        // Continue with the next change but the servers could now become
-        // inconsistent.
-        // Let the repair tool know about this.
-        Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
+          op.setInternalOperation(true);
+          op.setSynchronizationOperation(true);
+          changeNumber = OperationContext.getChangeNumber(op);
+          ((AbstractOperation) op).run();
+
+          // Try replay the operation
+          ResultCode result = op.getResultCode();
+
+          if (result != ResultCode.SUCCESS)
+          {
+            if (op instanceof ModifyOperation)
+            {
+              ModifyOperation newOp = (ModifyOperation) op;
+              dependency = remotePendingChanges.checkDependencies(newOp);
+              if (!dependency)
+              {
+                done = solveNamingConflict(newOp, msg);
+              }
+            } else if (op instanceof DeleteOperation)
+            {
+              DeleteOperation newOp = (DeleteOperation) op;
+              dependency = remotePendingChanges.checkDependencies(newOp);
+              if ((!dependency) && (!firstTry))
+              {
+                done = solveNamingConflict(newOp, msg);
+              }
+            } else if (op instanceof AddOperation)
+            {
+              AddOperation newOp = (AddOperation) op;
+              AddMsg addMsg = (AddMsg) msg;
+              dependency = remotePendingChanges.checkDependencies(newOp);
+              if (!dependency)
+              {
+                done = solveNamingConflict(newOp, addMsg);
+              }
+            } else if (op instanceof ModifyDNOperationBasis)
+            {
+              ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+              dependency = remotePendingChanges.checkDependencies(newMsg);
+              if (!dependency)
+              {
+                ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
+                done = solveNamingConflict(newOp, msg);
+              }
+            } else
+            {
+              done = true;  // unknown type of operation ?!
+            }
+            if (done)
+            {
+              // the update became a dummy update and the result
+              // of the conflict resolution phase is to do nothing.
+              // however we still need to push this change to the serverState
+              updateError(changeNumber);
+            }
+          } else
+          {
+            done = true;
+          }
+          firstTry = false;
+        }
+
+        if (!done && !dependency)
+        {
+          // Continue with the next change but the servers could now become
+          // inconsistent.
+          // Let the repair tool know about this.
+          Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
             op.getErrorMessage().toString());
-        logError(message);
-        numUnresolvedNamingConflicts.incrementAndGet();
+          logError(message);
+          numUnresolvedNamingConflicts.incrementAndGet();
 
-        updateError(changeNumber);
-      }
-    }
-    catch (ASN1Exception e)
-    {
-      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-              String.valueOf(msg) + stackTraceToSingleLineString(e));
-      logError(message);
-    }
-    catch (LDAPException e)
-    {
-      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-              String.valueOf(msg) + stackTraceToSingleLineString(e));
-      logError(message);
-    }
-    catch (DataFormatException e)
-    {
-      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-              String.valueOf(msg) + stackTraceToSingleLineString(e));
-      logError(message);
-    }
-    catch (Exception e)
-    {
-      if (changeNumber != null)
-      {
-        /*
-         * An Exception happened during the replay process.
-         * Continue with the next change but the servers will now start
-         * to be inconsistent.
-         * Let the repair tool know about this.
-         */
-        Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
-            stackTraceToSingleLineString(e), op.toString());
-        logError(message);
-        updateError(changeNumber);
-      }
-      else
+          updateError(changeNumber);
+        }
+      } catch (ASN1Exception e)
       {
         Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-                String.valueOf(msg) + stackTraceToSingleLineString(e));
+          String.valueOf(msg) + stackTraceToSingleLineString(e));
         logError(message);
-      }
-    }
-    finally
-    {
-      if (!dependency)
+      } catch (LDAPException e)
       {
-        if (msg.isAssured())
-          ack(msg.getChangeNumber());
-        incProcessedUpdates();
+        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+          String.valueOf(msg) + stackTraceToSingleLineString(e));
+        logError(message);
+      } catch (DataFormatException e)
+      {
+        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+          String.valueOf(msg) + stackTraceToSingleLineString(e));
+        logError(message);
+      } catch (Exception e)
+      {
+        if (changeNumber != null)
+        {
+          /*
+           * An Exception happened during the replay process.
+           * Continue with the next change but the servers will now start
+           * to be inconsistent.
+           * Let the repair tool know about this.
+           */
+          Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
+            stackTraceToSingleLineString(e), op.toString());
+          logError(message);
+          updateError(changeNumber);
+        } else
+        {
+          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+            String.valueOf(msg) + stackTraceToSingleLineString(e));
+          logError(message);
+        }
+      } finally
+      {
+        if (!dependency)
+        {
+          if (msg.isAssured())
+            ack(msg.getChangeNumber());
+          incProcessedUpdates();
+        }
       }
-    }
+
+      // Now replay any pending update that had a dependency and whose
+      // dependency has been replayed, do that until no more updates of that
+      // type left...
+      msg = remotePendingChanges.getNextUpdate();
+
+      // Prepare restart of loop
+      done = false;
+      dependency = false;
+      changeNumber = null;
+      retryCount = 10;
+      firstTry = true;
+
+    } while (msg != null);
   }
 
   /**
@@ -2224,7 +2235,7 @@
    * The session to the replication server will be stopped.
    * The domain will not be destroyed but call to the pre-operation
    * methods will result in failure.
-   * The listener threads will be destroyed.
+   * The listener thread will be destroyed.
    * The monitor informations will still be accessible.
    */
   public void disable()
@@ -2232,23 +2243,14 @@
     state.save();
     state.clearInMemory();
     disabled = true;
-    //  stop the listener threads
-    for (ListenerThread thread : synchroThreads)
-    {
-      thread.shutdown();
-    }
-    broker.stop(); // this will cut the session and wake-up the listeners
 
-    for (ListenerThread thread : synchroThreads)
-    {
-      try
-      {
-        thread.join(SHUTDOWN_JOIN_TIMEOUT);
-      } catch (InterruptedException e)
-      {
-        // ignore
-      }
-    }
+    // Stop the listener thread
+    listenerThread.shutdown();
+
+    broker.stop(); // This will cut the session and wake up the listener
+
+    // Wait for the listener thread to stop
+    listenerThread.waitForShutdown();
   }
 
   /**
@@ -2265,7 +2267,6 @@
     state.loadState();
     disabled = false;
 
-
     try
     {
       generationId = loadGenerationId();
@@ -2288,7 +2289,9 @@
 
     broker.start(replicationServers);
 
-    createListeners();
+    // Create the listener thread
+    listenerThread = new ListenerThread(this, updateToReplayQueue);
+    listenerThread.start();
   }
 
   /**

--
Gitblit v1.10.0