mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
29.27.2007 a6be5db964ffa77a68b91966d99f6fa0b36b532e
Fix for 1561 : ReplicationDomain.disable() should wait for all threads to die be fore returning

Done for both the replication plugin and replication server shutdown.

Also added some debug logging around thread creation and stop.
9 files modified
297 ■■■■ changed files
opends/src/server/org/opends/server/messages/ReplicationMessages.java 58 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java 47 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 23 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/SocketSession.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 30 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 63 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 33 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -368,6 +368,22 @@
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 51;
  /**
   * A replication server received a null messsage from
   * another server.
   */
  public static final int MSGID_READER_NULL_MSG =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 52;
  /**
   * A server disconnected from the replication server.
   * (this is an informational message)
   */
  public static final int MSGID_READER_EXCEPTION =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -376,7 +392,7 @@
    registerMessage(MSGID_SYNC_INVALID_DN,
       "The configured DN is already used by another domain");
    registerMessage(MSGID_INVALID_CHANGELOG_SERVER,
        "Invalid changelog server configuration");
        "Invalid replication server configuration");
    registerMessage(MSGID_UNKNOWN_HOSTNAME,
        "Changelog failed to start because the hostname is unknown");
    registerMessage(MSGID_COULD_NOT_BIND_CHANGELOG,
@@ -398,31 +414,31 @@
    registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION,
         "An Exception was caught while replaying operation %s : %s");
    registerMessage(MSGID_NEED_CHANGELOG_PORT,
         "The Changelog server port must be defined");
         "The replication server port must be defined");
    registerMessage(MSGID_ERROR_UPDATING_RUV,
         "Error %s when updating server state %s : %s base dn : %s");
    registerMessage(MSGID_ERROR_SEARCHING_RUV,
         "Error %s when searching for server state %s : %s base dn : %s");
    registerMessage(MSGID_SERVER_DISCONNECT,
         "%s has disconnected from this changelog server");
         "%s has disconnected from this replication server");
    registerMessage(MSGID_NO_CHANGELOG_SERVER_LISTENING,
         "There is no changelog server listening on %s");
         "There is no replication server listening on %s");
    registerMessage(MSGID_CHANGELOG_MISSING_CHANGES,
        "The changelog server %s is missing some changes that this server" +
        "The replication server %s is missing some changes that this server" +
        " has already processed");
    registerMessage(MSGID_NEED_MORE_THAN_ONE_CHANGELOG_SERVER,
        "More than one changelog server should be configured");
        "More than one replication server should be configured");
    registerMessage(MSGID_EXCEPTION_STARTING_SESSION,
        "Caught Exception during initial communication with " +
        "changelog server : ");
        "replication server : ");
    registerMessage(MSGID_CANNOT_RECOVER_CHANGES,
        "Error when searching old changes from the database. ");
    registerMessage(
        MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES,
        "Could not find a changelog server that has seen all the local" +
        "Could not find a replication server that has seen all the local" +
        " changes. Going to replay changes");
    registerMessage(MSGID_COULD_NOT_FIND_CHANGELOG,
        "Could not connect to any changelog server, retrying...");
        "Could not connect to any replication server, retrying...");
    registerMessage(MSGID_EXCEPTION_CLOSING_DATABASE,
        "Error closing changelog database %s : ");
    registerMessage(MSGID_EXCEPTION_DECODING_OPERATION,
@@ -458,33 +474,33 @@
        "An Exception was caught while testing existence or trying " +
        " to create the directory for the changelog database : %s");
    registerMessage(MSGID_CHANGELOG_SERVER_ATTR,
        "Specifies the list of Changelog Servers to which this" +
        " Changelog Server should connect. Each value of this attribute" +
        "Specifies the list of replication servers to which this" +
        " replication server should connect. Each value of this attribute" +
        " should contain a values build with the hostname and the port" +
        " number of the remote server separated with a \":\"");
    registerMessage(MSGID_SERVER_ID_ATTR,
        "Specifies the server ID. Each Changelog Server in the topology" +
        "Specifies the server ID. Each replication server in the topology" +
        " Must be assigned a unique server ID in the topology");
    registerMessage(MSGID_CHANGELOG_PORT_ATTR,
        "Specifies the port number that the changelog server will use to" +
        "Specifies the port number that the replication server will use to" +
        " listen for connections from LDAP servers");
    registerMessage(MSGID_WINDOW_SIZE_ATTR,
        "Specifies the receive window size of the changelog server");
        "Specifies the receive window size of the replication server");
    registerMessage(MSGID_QUEUE_SIZE_ATTR,
        "Specifies the receive queue size of the changelog server." +
        " The Changelog servers will queue up to this number of messages" +
        "Specifies the receive queue size of the replication server." +
        " The replication servers will queue up to this number of messages" +
        " in its memory queue and save the older messages to persistent" +
        " storage. Using a larger size may improve performances when" +
        " The replication delay is larger than this size but at the cost" +
        " of using more memory");
    registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR,
        "Specifies the Changelog Server directory. The Changelog server" +
        "Specifies the replication server directory. The replication server" +
        " will create all persistent storage below this path");
    registerMessage(MSGID_PURGE_DELAY_ATTR,
        "Specifies the Changelog Purge Delay, The Changelog servers will" +
        "Specifies the Changelog Purge Delay, The replication servers will" +
        " keep all changes up to this amount of time before deleting them." +
        " This values defines the maximum age of a backup that can be" +
        " restored because changelog servers would not be able to refresh" +
        " restored because replication servers would not be able to refresh" +
        " LDAP servers with older versions of the data. A zero value" +
        " can be used to specify an infinite delay (or never purge)");
    registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
@@ -504,5 +520,9 @@
        "The provider class does not allow the operation requested");
    registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME,
        "The hostname %s could not be resolved as an IP address");
    registerMessage(MSGID_READER_NULL_MSG,
        "Received a Null Msg from %s");
    registerMessage(MSGID_READER_EXCEPTION,
        "Exception when reading messages from %s");
  }
}
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -27,11 +27,14 @@
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -42,6 +45,11 @@
 */
public class ListenerThread extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private ReplicationDomain listener;
  private boolean shutdown = false;
@@ -70,23 +78,38 @@
  public void run()
  {
    UpdateMessage msg;
    boolean done = false;
    try
    if (debugEnabled())
    {
      while (((msg = listener.receive()) != null) && (shutdown == false))
      TRACER.debugInfo("Replication Listener thread starting.");
    }
    while (!done)
    {
      try
      {
        listener.replay(msg);
        while (((msg = listener.receive()) != null) && (shutdown == false))
        {
          listener.replay(msg);
        }
        done = true;
      } catch (Exception e)
      {
        /*
         * catch all exceptions happening in listener.receive and
         * listener.replay so that the thread never dies even in case
         * of problems.
         */
        int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE;
        String message = getMessage(msgID, stackTraceToSingleLineString(e));
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      }
    } catch (Exception e)
    }
    if (debugEnabled())
    {
      /*
       * catch all exceptions happening in listener.receive and listener.replay
       * so that the thread never dies even in case of problems.
       */
      int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE;
      String message = getMessage(msgID, stackTraceToSingleLineString(e));
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      TRACER.debugInfo("Replication Listener thread stopping.");
    }
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,7 +207,7 @@
    }
    boolean checkState = true;
    while( !connected)
    while ((!connected) && (!shutdown))
    {
      for (String server : servers)
      {
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -126,6 +126,12 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * on shutdown, the server will wait for existing threads to stop
   * during this timeout (in ms).
   */
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  private ReplicationMonitor monitor;
  private ReplicationBroker broker;
@@ -1047,6 +1053,12 @@
    // stop the ReplicationBroker
    broker.stop();
    //  wait for the listener thread to stop
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
    }
  }
  /**
@@ -1745,6 +1757,17 @@
      thread.shutdown();
    }
    broker.stop(); // this will cut the session and wake-up the listeners
    for (ListenerThread thread : synchroThreads)
    {
      try
      {
        thread.join(SHUTDOWN_JOIN_TIMEOUT);
      } catch (InterruptedException e)
      {
        // ignore
      }
    }
  }
  /**
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -29,6 +29,7 @@
import org.opends.server.api.DirectoryThread;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import java.io.IOException;
@@ -66,7 +67,7 @@
  /**
   * Set this to stop the thread.
   */
  private boolean shutdown = false;
  private Boolean shutdown = false;
  /**
@@ -133,7 +134,14 @@
          {
            TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
          }
          Thread.sleep(sleepTime);
          synchronized (shutdown)
          {
            if (!shutdown)
            {
              shutdown.wait(sleepTime);
            }
          }
        }
        catch (InterruptedException e)
        {
@@ -161,10 +169,23 @@
  /**
   * Call this method to stop the thread.
   * This method is blocking until the thread has stopped.
   */
  public void shutdown()
  {
    shutdown = true;
    synchronized (shutdown)
    {
      shutdown.notifyAll();
      shutdown = true;
      if (debugEnabled())
      {
        TRACER.debugInfo("Going to notify Heartbeat thread.");
      }
    }
    if (debugEnabled())
    {
      TRACER.debugInfo("Returning from Heartbeat shutdown.");
    }
  }
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -26,6 +26,9 @@
 */
package org.opends.server.replication.protocol;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -33,6 +36,8 @@
import java.net.SocketException;
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
/**
 * This class Implement a protocol session using a basic socket and relying on
 * the innate encoding/decoding capabilities of the ReplicationMessage
@@ -44,6 +49,11 @@
 */
public class SocketSession implements ProtocolSession
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private Socket socket;
  private InputStream input;
  private OutputStream output;
@@ -83,6 +93,10 @@
   */
  public void close() throws IOException
  {
    if (debugEnabled())
    {
      TRACER.debugVerbose("Closing SocketSession.");
    }
    socket.close();
  }
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -28,6 +28,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
@@ -79,6 +80,12 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Time during which the server will wait for existing thread to stop
   * during the shutdown.
   */
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  private short serverId;
  private ProtocolSession session;
  private final MsgQueue msgQueue = new MsgQueue();
@@ -747,7 +754,7 @@
  private UpdateMessage getnextMessage()
  {
    UpdateMessage msg;
    do
    while (active == true)
    {
      if (following == false)
      {
@@ -884,7 +891,7 @@
       * the first check at the beginning of this method
       * and the second check just above.
       */
    } while (active == true);
    }
    return null;
  }
@@ -905,6 +912,15 @@
  public void stopHandler()
  {
    active = false;
    try
    {
      session.close();
    } catch (IOException e)
    {
      // ignore.
    }
    synchronized (msgQueue)
    {
      /* wake up the writer thread on an empty queue so that it disappear */
@@ -1218,7 +1234,17 @@
    {
      // Service is closing.
    }
    stopHandler();
    try
    {
      writer.join(SHUTDOWN_JOIN_TIMEOUT);
      reader.join(SHUTDOWN_JOIN_TIMEOUT);
    } catch (InterruptedException e)
    {
      // don't try anymore to join and return.
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -29,6 +29,8 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.io.IOException;
@@ -45,6 +47,7 @@
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
/**
@@ -59,6 +62,11 @@
 */
public class ServerReader extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private short serverId;
  private ProtocolSession session;
  private ServerHandler handler;
@@ -87,8 +95,18 @@
   */
  public void run()
  {
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server reader starting " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server reader starting " + serverId);
      }
    }
    /*
     * TODO : catch exceptions in case of bugs
     * wait on input stream
     * grab all incoming messages and publish them to the replicationCache
     */
@@ -98,12 +116,6 @@
      {
        ReplicationMessage msg = session.receive();
        if (msg == null)
        {
          // TODO : generate error in the log
          // make sure that connection is closed
          return;
        }
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
@@ -147,7 +159,19 @@
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
        else if (msg == null)
        {
          /*
           * The remote server has sent an unknown message,
           * close the conenction.
           */
          int    msgID   = MSGID_READER_NULL_MSG;
          String message = getMessage(msgID, handler.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          return;
        }
      }
    } catch (IOException e)
    {
@@ -160,7 +184,7 @@
      String message = getMessage(msgID, handler.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.NOTICE,
               message, msgID);
               message + e.getMessage(), msgID);
    } catch (ClassNotFoundException e)
    {
      /*
@@ -174,7 +198,15 @@
               message, msgID);
    } catch (Exception e)
    {
      /*
       * The remote server has sent an unknown message,
       * close the conenction.
       */
      int    msgID   = MSGID_READER_EXCEPTION;
      String message = getMessage(msgID, handler.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    }
    finally
    {
@@ -192,5 +224,16 @@
      }
      replicationCache.stopServer(handler);
    }
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server reader stopping " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server reader stopping " + serverId);
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -27,6 +27,8 @@
package org.opends.server.replication.server;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
@@ -35,6 +37,7 @@
import java.util.NoSuchElementException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.ErrorLogCategory;
@@ -47,9 +50,15 @@
 */
public class ServerWriter extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationCache replicationCache;
  private short serverId;
  /**
   * Create a ServerWriter.
@@ -66,6 +75,7 @@
  {
    super(handler.toString() + " writer");
    this.serverId = serverId;
    this.session = session;
    this.handler = handler;
    this.replicationCache = replicationCache;
@@ -78,6 +88,17 @@
   */
  public void run()
  {
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server writer starting " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server writer starting " + serverId);
      }
    }
    try {
      while (true)
      {
@@ -132,6 +153,18 @@
       // Can't do much more : ignore
      }
      replicationCache.stopServer(handler);
      if (debugEnabled())
      {
        if (handler.isReplicationServer())
        {
          TRACER.debugInfo("Replication server writer stopping " + serverId);
        }
        else
        {
          TRACER.debugInfo("LDAP server writer stopping " + serverId);
        }
      }
    }
  }
}