From a6be5db964ffa77a68b91966d99f6fa0b36b532e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 29 May 2007 09:27:46 +0000
Subject: [PATCH] Fix for 1561 : ReplicationDomain.disable() should wait for all threads to die be fore returning

---
 opends/src/server/org/opends/server/messages/ReplicationMessages.java         |   58 +++++++---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |    2 
 opends/src/server/org/opends/server/replication/protocol/SocketSession.java   |   14 ++
 opends/src/server/org/opends/server/replication/server/ServerWriter.java      |   33 ++++++
 opends/src/server/org/opends/server/replication/plugin/ListenerThread.java    |   47 +++++++--
 opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java |   27 ++++
 opends/src/server/org/opends/server/replication/server/ServerHandler.java     |   30 +++++
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |   23 ++++
 opends/src/server/org/opends/server/replication/server/ServerReader.java      |   63 ++++++++++--
 9 files changed, 250 insertions(+), 47 deletions(-)

diff --git a/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 48f7704e..68bcf69 100644
--- a/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -368,6 +368,22 @@
     CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 51;
 
   /**
+   * A replication server received a null messsage from
+   * another server.
+   */
+  public static final int MSGID_READER_NULL_MSG =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 52;
+
+  /**
+   * A server disconnected from the replication server.
+   * (this is an informational message)
+   */
+  public static final int MSGID_READER_EXCEPTION =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53;
+
+
+
+  /**
    * Register the messages from this class in the core server.
    *
    */
@@ -376,7 +392,7 @@
     registerMessage(MSGID_SYNC_INVALID_DN,
        "The configured DN is already used by another domain");
     registerMessage(MSGID_INVALID_CHANGELOG_SERVER,
-        "Invalid changelog server configuration");
+        "Invalid replication server configuration");
     registerMessage(MSGID_UNKNOWN_HOSTNAME,
         "Changelog failed to start because the hostname is unknown");
     registerMessage(MSGID_COULD_NOT_BIND_CHANGELOG,
@@ -398,31 +414,31 @@
     registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION,
          "An Exception was caught while replaying operation %s : %s");
     registerMessage(MSGID_NEED_CHANGELOG_PORT,
-         "The Changelog server port must be defined");
+         "The replication server port must be defined");
     registerMessage(MSGID_ERROR_UPDATING_RUV,
          "Error %s when updating server state %s : %s base dn : %s");
     registerMessage(MSGID_ERROR_SEARCHING_RUV,
          "Error %s when searching for server state %s : %s base dn : %s");
     registerMessage(MSGID_SERVER_DISCONNECT,
-         "%s has disconnected from this changelog server");
+         "%s has disconnected from this replication server");
     registerMessage(MSGID_NO_CHANGELOG_SERVER_LISTENING,
-         "There is no changelog server listening on %s");
+         "There is no replication server listening on %s");
     registerMessage(MSGID_CHANGELOG_MISSING_CHANGES,
-        "The changelog server %s is missing some changes that this server" +
+        "The replication server %s is missing some changes that this server" +
         " has already processed");
     registerMessage(MSGID_NEED_MORE_THAN_ONE_CHANGELOG_SERVER,
-        "More than one changelog server should be configured");
+        "More than one replication server should be configured");
     registerMessage(MSGID_EXCEPTION_STARTING_SESSION,
         "Caught Exception during initial communication with " +
-        "changelog server : ");
+        "replication server : ");
     registerMessage(MSGID_CANNOT_RECOVER_CHANGES,
         "Error when searching old changes from the database. ");
     registerMessage(
         MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES,
-        "Could not find a changelog server that has seen all the local" +
+        "Could not find a replication server that has seen all the local" +
         " changes. Going to replay changes");
     registerMessage(MSGID_COULD_NOT_FIND_CHANGELOG,
-        "Could not connect to any changelog server, retrying...");
+        "Could not connect to any replication server, retrying...");
     registerMessage(MSGID_EXCEPTION_CLOSING_DATABASE,
         "Error closing changelog database %s : ");
     registerMessage(MSGID_EXCEPTION_DECODING_OPERATION,
@@ -458,33 +474,33 @@
         "An Exception was caught while testing existence or trying " +
         " to create the directory for the changelog database : %s");
     registerMessage(MSGID_CHANGELOG_SERVER_ATTR,
-        "Specifies the list of Changelog Servers to which this" +
-        " Changelog Server should connect. Each value of this attribute" +
+        "Specifies the list of replication servers to which this" +
+        " replication server should connect. Each value of this attribute" +
         " should contain a values build with the hostname and the port" +
         " number of the remote server separated with a \":\"");
     registerMessage(MSGID_SERVER_ID_ATTR,
-        "Specifies the server ID. Each Changelog Server in the topology" +
+        "Specifies the server ID. Each replication server in the topology" +
         " Must be assigned a unique server ID in the topology");
     registerMessage(MSGID_CHANGELOG_PORT_ATTR,
-        "Specifies the port number that the changelog server will use to" +
+        "Specifies the port number that the replication server will use to" +
         " listen for connections from LDAP servers");
     registerMessage(MSGID_WINDOW_SIZE_ATTR,
-        "Specifies the receive window size of the changelog server");
+        "Specifies the receive window size of the replication server");
     registerMessage(MSGID_QUEUE_SIZE_ATTR,
-        "Specifies the receive queue size of the changelog server." +
-        " The Changelog servers will queue up to this number of messages" +
+        "Specifies the receive queue size of the replication server." +
+        " The replication servers will queue up to this number of messages" +
         " in its memory queue and save the older messages to persistent" +
         " storage. Using a larger size may improve performances when" +
         " The replication delay is larger than this size but at the cost" +
         " of using more memory");
     registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR,
-        "Specifies the Changelog Server directory. The Changelog server" +
+        "Specifies the replication server directory. The replication server" +
         " will create all persistent storage below this path");
     registerMessage(MSGID_PURGE_DELAY_ATTR,
-        "Specifies the Changelog Purge Delay, The Changelog servers will" +
+        "Specifies the Changelog Purge Delay, The replication servers will" +
         " keep all changes up to this amount of time before deleting them." +
         " This values defines the maximum age of a backup that can be" +
-        " restored because changelog servers would not be able to refresh" +
+        " restored because replication servers would not be able to refresh" +
         " LDAP servers with older versions of the data. A zero value" +
         " can be used to specify an infinite delay (or never purge)");
     registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
@@ -504,5 +520,9 @@
         "The provider class does not allow the operation requested");
     registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME,
         "The hostname %s could not be resolved as an IP address");
+    registerMessage(MSGID_READER_NULL_MSG,
+        "Received a Null Msg from %s");
+    registerMessage(MSGID_READER_EXCEPTION,
+        "Exception when reading messages from %s");
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
index 5300d58..1e626cf 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -27,11 +27,14 @@
 package org.opends.server.replication.plugin;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
@@ -42,6 +45,11 @@
  */
 public class ListenerThread extends DirectoryThread
 {
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
   private ReplicationDomain listener;
   private boolean shutdown = false;
 
@@ -70,23 +78,38 @@
   public void run()
   {
     UpdateMessage msg;
+    boolean done = false;
 
-    try
+    if (debugEnabled())
     {
-      while (((msg = listener.receive()) != null) && (shutdown == false))
+      TRACER.debugInfo("Replication Listener thread starting.");
+    }
+
+    while (!done)
+    {
+      try
       {
-        listener.replay(msg);
+        while (((msg = listener.receive()) != null) && (shutdown == false))
+        {
+          listener.replay(msg);
+        }
+        done = true;
+      } catch (Exception e)
+      {
+        /*
+         * catch all exceptions happening in listener.receive and
+         * listener.replay so that the thread never dies even in case
+         * of problems.
+         */
+        int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE;
+        String message = getMessage(msgID, stackTraceToSingleLineString(e));
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
       }
-    } catch (Exception e)
+    }
+    if (debugEnabled())
     {
-      /*
-       * catch all exceptions happening in listener.receive and listener.replay
-       * so that the thread never dies even in case of problems.
-       */
-      int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE;
-      String message = getMessage(msgID, stackTraceToSingleLineString(e));
-      logError(ErrorLogCategory.SYNCHRONIZATION,
-          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+      TRACER.debugInfo("Replication Listener thread stopping.");
     }
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a27dc96..a67fdc3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,7 +207,7 @@
     }
 
     boolean checkState = true;
-    while( !connected)
+    while ((!connected) && (!shutdown))
     {
       for (String server : servers)
       {
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 985d2f0..e601e1f 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -126,6 +126,12 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /**
+   * on shutdown, the server will wait for existing threads to stop
+   * during this timeout (in ms).
+   */
+  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
+
   private ReplicationMonitor monitor;
 
   private ReplicationBroker broker;
@@ -1047,6 +1053,12 @@
 
     // stop the ReplicationBroker
     broker.stop();
+
+    //  wait for the listener thread to stop
+    for (ListenerThread thread : synchroThreads)
+    {
+      thread.shutdown();
+    }
   }
 
   /**
@@ -1745,6 +1757,17 @@
       thread.shutdown();
     }
     broker.stop(); // this will cut the session and wake-up the listeners
+
+    for (ListenerThread thread : synchroThreads)
+    {
+      try
+      {
+        thread.join(SHUTDOWN_JOIN_TIMEOUT);
+      } catch (InterruptedException e)
+      {
+        // ignore
+      }
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 9723814..c246b4c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -29,6 +29,7 @@
 
 import org.opends.server.api.DirectoryThread;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.debug.DebugTracer;
 
 import java.io.IOException;
@@ -66,7 +67,7 @@
   /**
    * Set this to stop the thread.
    */
-  private boolean shutdown = false;
+  private Boolean shutdown = false;
 
 
   /**
@@ -133,7 +134,14 @@
           {
             TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
           }
-          Thread.sleep(sleepTime);
+
+          synchronized (shutdown)
+          {
+            if (!shutdown)
+            {
+              shutdown.wait(sleepTime);
+            }
+          }
         }
         catch (InterruptedException e)
         {
@@ -161,10 +169,23 @@
 
   /**
    * Call this method to stop the thread.
+   * This method is blocking until the thread has stopped.
    */
   public void shutdown()
   {
-    shutdown = true;
+    synchronized (shutdown)
+    {
+      shutdown.notifyAll();
+      shutdown = true;
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Going to notify Heartbeat thread.");
+      }
+    }
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("Returning from Heartbeat shutdown.");
+    }
   }
 
 
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 0c53005..7d0a90f 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -26,6 +26,9 @@
  */
 package org.opends.server.replication.protocol;
 
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -33,6 +36,8 @@
 import java.net.SocketException;
 import java.util.zip.DataFormatException;
 
+import org.opends.server.loggers.debug.DebugTracer;
+
 /**
  * This class Implement a protocol session using a basic socket and relying on
  * the innate encoding/decoding capabilities of the ReplicationMessage
@@ -44,6 +49,11 @@
  */
 public class SocketSession implements ProtocolSession
 {
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
   private Socket socket;
   private InputStream input;
   private OutputStream output;
@@ -83,6 +93,10 @@
    */
   public void close() throws IOException
   {
+    if (debugEnabled())
+    {
+      TRACER.debugVerbose("Closing SocketSession.");
+    }
     socket.close();
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a098c4b..c9d2835 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -28,6 +28,7 @@
 
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.messages.ReplicationMessages.*;
@@ -79,6 +80,12 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /**
+   * Time during which the server will wait for existing thread to stop
+   * during the shutdown.
+   */
+  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
+
   private short serverId;
   private ProtocolSession session;
   private final MsgQueue msgQueue = new MsgQueue();
@@ -747,7 +754,7 @@
   private UpdateMessage getnextMessage()
   {
     UpdateMessage msg;
-    do
+    while (active == true)
     {
       if (following == false)
       {
@@ -884,7 +891,7 @@
        * the first check at the beginning of this method
        * and the second check just above.
        */
-    } while (active == true);
+    }
     return null;
   }
 
@@ -905,6 +912,15 @@
   public void stopHandler()
   {
     active = false;
+
+    try
+    {
+      session.close();
+    } catch (IOException e)
+    {
+      // ignore.
+    }
+
     synchronized (msgQueue)
     {
       /* wake up the writer thread on an empty queue so that it disappear */
@@ -1218,7 +1234,17 @@
     {
       // Service is closing.
     }
+
     stopHandler();
+
+    try
+    {
+      writer.join(SHUTDOWN_JOIN_TIMEOUT);
+      reader.join(SHUTDOWN_JOIN_TIMEOUT);
+    } catch (InterruptedException e)
+    {
+      // don't try anymore to join and return.
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 3c72463..41fb483 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -29,6 +29,8 @@
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
 
 import java.io.IOException;
 
@@ -45,6 +47,7 @@
 import org.opends.server.replication.protocol.WindowMessage;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.loggers.debug.DebugTracer;
 
 
 /**
@@ -59,6 +62,11 @@
  */
 public class ServerReader extends DirectoryThread
 {
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
   private short serverId;
   private ProtocolSession session;
   private ServerHandler handler;
@@ -87,8 +95,18 @@
    */
   public void run()
   {
+    if (debugEnabled())
+    {
+      if (handler.isReplicationServer())
+      {
+        TRACER.debugInfo("Replication server reader starting " + serverId);
+      }
+      else
+      {
+        TRACER.debugInfo("LDAP server reader starting " + serverId);
+      }
+    }
     /*
-     * TODO : catch exceptions in case of bugs
      * wait on input stream
      * grab all incoming messages and publish them to the replicationCache
      */
@@ -98,12 +116,6 @@
       {
         ReplicationMessage msg = session.receive();
 
-        if (msg == null)
-        {
-          // TODO : generate error in the log
-          // make sure that connection is closed
-          return;
-        }
         if (msg instanceof AckMessage)
         {
           AckMessage ack = (AckMessage) msg;
@@ -147,7 +159,19 @@
           ErrorMessage errorMsg = (ErrorMessage) msg;
           handler.process(errorMsg);
         }
-
+        else if (msg == null)
+        {
+          /*
+           * The remote server has sent an unknown message,
+           * close the conenction.
+           */
+          int    msgID   = MSGID_READER_NULL_MSG;
+          String message = getMessage(msgID, handler.toString());
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+                   ErrorLogSeverity.SEVERE_ERROR,
+                   message, msgID);
+          return;
+        }
       }
     } catch (IOException e)
     {
@@ -160,7 +184,7 @@
       String message = getMessage(msgID, handler.toString());
       logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
-               message, msgID);
+               message + e.getMessage(), msgID);
     } catch (ClassNotFoundException e)
     {
       /*
@@ -174,7 +198,15 @@
                message, msgID);
     } catch (Exception e)
     {
-
+      /*
+       * The remote server has sent an unknown message,
+       * close the conenction.
+       */
+      int    msgID   = MSGID_READER_EXCEPTION;
+      String message = getMessage(msgID, handler.toString());
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+               ErrorLogSeverity.SEVERE_ERROR,
+               message, msgID);
     }
     finally
     {
@@ -192,5 +224,16 @@
       }
       replicationCache.stopServer(handler);
     }
+    if (debugEnabled())
+    {
+      if (handler.isReplicationServer())
+      {
+        TRACER.debugInfo("Replication server reader stopping " + serverId);
+      }
+      else
+      {
+        TRACER.debugInfo("LDAP server reader stopping " + serverId);
+      }
+    }
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 5344e20..6018ed9 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -27,6 +27,8 @@
 package org.opends.server.replication.server;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.messages.ReplicationMessages.*;
 
@@ -35,6 +37,7 @@
 import java.util.NoSuchElementException;
 
 import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.types.ErrorLogCategory;
@@ -47,9 +50,15 @@
  */
 public class ServerWriter extends DirectoryThread
 {
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
   private ProtocolSession session;
   private ServerHandler handler;
   private ReplicationCache replicationCache;
+  private short serverId;
 
   /**
    * Create a ServerWriter.
@@ -66,6 +75,7 @@
   {
     super(handler.toString() + " writer");
 
+    this.serverId = serverId;
     this.session = session;
     this.handler = handler;
     this.replicationCache = replicationCache;
@@ -78,6 +88,17 @@
    */
   public void run()
   {
+    if (debugEnabled())
+    {
+      if (handler.isReplicationServer())
+      {
+        TRACER.debugInfo("Replication server writer starting " + serverId);
+      }
+      else
+      {
+        TRACER.debugInfo("LDAP server writer starting " + serverId);
+      }
+    }
     try {
       while (true)
       {
@@ -132,6 +153,18 @@
        // Can't do much more : ignore
       }
       replicationCache.stopServer(handler);
+
+      if (debugEnabled())
+      {
+        if (handler.isReplicationServer())
+        {
+          TRACER.debugInfo("Replication server writer stopping " + serverId);
+        }
+        else
+        {
+          TRACER.debugInfo("LDAP server writer stopping " + serverId);
+        }
+      }
     }
   }
 }

--
Gitblit v1.10.0