mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.32.2013 3c0945a130c4433a1e2515353c013462d6a83f23
OPENDJ-1255 SEVERE_ERROR (Socket closed) in logs/errors file after uninstallation of a replicated server

Code review: Matthew Swift


Problem lied in the MonitoringPublisher thread which sends the monitoring information at regular intervals (60s by default).
In run(), code was waiting for 60s then happily sending the monitor message regardless of whether the server was shutting down.
However, if the thread is notified by shutdown process while waiting 60s, then it will immediately try to send the monitor message.
Since the shutdown is initiated, it will receive a SocketException when trying to send a monitor message. This exception is then logged.

Fix consisted in checking whether the server is shutting down right after being notified.


MonitoringPublisher.java:
Removed shutdown instance field (superseded by initiateShutdown() and isShuttingDown()).
In run(), moved the call to wait(long) at the end of the loop, so a notification will be immediately followed by a check on whether the server is shutting down + also added a shutdown check in the body of the loop.
In shutdown(), called initiateShutdown() before entering the synchronized block + moved the logging code outside of the synchronized block.
In waitForShutdown(), used Thread.join(long) instead of ill looking code.
1 files modified
50 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java 50 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -46,8 +46,6 @@
public class MonitoringPublisher extends DirectoryThread
{
  private volatile boolean shutdown = false;
  /**
   * The tracer object for the debug logger.
   */
@@ -59,9 +57,6 @@
  /** Sleep time (in ms) before sending new monitoring messages. */
  private volatile long period;
  /** Whether the thread is terminated. */
  private volatile boolean done = false;
  private final Object shutdownLock = new Object();
  /**
@@ -95,16 +90,8 @@
    try
    {
      while (!shutdown)
      while (!isShutdownInitiated())
      {
        synchronized (shutdownLock)
        {
          if (!shutdown)
          {
            shutdownLock.wait(period);
          }
        }
        // Send global topology information to peer DSs
        final int senderId = domain.getLocalRSServerId();
        final MonitorMsg monitorMsg =
@@ -112,6 +99,11 @@
        for (ServerHandler serverHandler : domain.getConnectedDSs().values())
        {
          // send() can be long operation, check for shutdown between each calls
          if (isShutdownInitiated())
          {
            break;
          }
          monitorMsg.setDestination(serverHandler.getServerId());
          try
          {
@@ -122,6 +114,15 @@
            // Server is disconnecting ? Forget it
          }
        }
        synchronized (shutdownLock)
        {
          // double check to ensure the call to notify() was not missed
          if (!isShutdownInitiated())
          {
            shutdownLock.wait(period);
          }
        }
      }
    }
    catch (InterruptedException e)
@@ -130,7 +131,6 @@
          "Monitoring publisher has been interrupted while sleeping."));
    }
    done = true;
    TRACER.debugInfo(getMessage("Monitoring publisher is terminated."));
  }
@@ -141,17 +141,16 @@
   */
  public void shutdown()
  {
    initiateShutdown();
    synchronized (shutdownLock)
    {
      shutdown = true;
      shutdownLock.notifyAll();
    }
      if (debugEnabled())
      {
        TRACER.debugInfo(getMessage("Shutting down monitoring publisher."));
      }
    }
  }
  /**
   * Waits for thread death. If not terminated within 2 seconds,
@@ -161,19 +160,10 @@
  {
    try
    {
      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
      int n = 0;
      while (!done && isAlive())
      {
        Thread.sleep(50);
        n++;
        if (n >= FACTOR)
        {
          TRACER.debugInfo(getMessage("Interrupting monitoring publisher."));
          interrupt();
      // Here, "this" is the monitoring publisher thread
      this.join(2000);
        }
      }
    } catch (InterruptedException e)
    catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }