mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
20.19.2012 e131e3cef7357a2f5acdc3d86979b8c5486ca71b
Fix OPENDJ-457: Sleeping replication threads prevent server from shutting down

Improved responsiveness of several threads:

* remove sleep based looped and replaced with wait/notify loops
* ensured that shutdown signal variables are volatile
* ensured that InterruptedExceptions are treated as shutdown since these are sent by shutdown monitor.
7 files modified
198 ■■■■■ changed files
opends/src/server/org/opends/server/core/IdleTimeLimitThread.java 35 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/ServerShutdownMonitor.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java 42 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 25 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java 34 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java 55 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2012 ForgeRock AS.
 */
package org.opends.server.core;
import org.opends.messages.Message;
@@ -38,6 +39,7 @@
import org.opends.server.types.DisconnectReason;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.ErrorLogger;
import static org.opends.messages.CoreMessages.*;
@@ -60,8 +62,9 @@
  // Indicates whether a shutdown request has been received.
  private boolean shutdownRequested;
  // Shutdown monitor state.
  private volatile boolean shutdownRequested;
  private final Object shutdownLock = new Object();
@@ -93,10 +96,26 @@
    {
      try
      {
        try
        synchronized (shutdownLock)
        {
          sleep(sleepTime);
        } catch (InterruptedException ie) {}
          if (!shutdownRequested)
          {
            try
            {
              shutdownLock.wait(sleepTime);
            }
            catch (InterruptedException e)
            {
              // Server shutdown monitor may interrupt slow threads.
              if (debugEnabled())
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, e);
              }
              shutdownRequested = true;
              break;
            }
          }
        }
        sleepTime = 5000L;
        for (ConnectionHandler<?> ch : DirectoryServer.getConnectionHandlers())
@@ -181,7 +200,11 @@
   */
  public void processServerShutdown(Message reason)
  {
    shutdownRequested = true;
    synchronized (shutdownLock)
    {
      shutdownRequested = true;
      shutdownLock.notifyAll();
    }
  }
}
opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2012 ForgeRock AS.
 */
package org.opends.server.core;
@@ -43,7 +44,7 @@
{
  // Indicates whether the monitor has completed and the shutdown may be
  // finalized with a call to System.exit;
  private boolean monitorDone;
  private volatile boolean monitorDone;
  // The list of threads that need to be monitored.
  private LinkedList<Thread> threadList;
opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 *      Portions copyright 2011-2012 ForgeRock AS
 */
package org.opends.server.protocols.ldap;
import static org.opends.messages.ProtocolMessages.*;
@@ -155,7 +155,7 @@
  // Indicates whether the Directory Server is in the process of
  // shutting down.
  private boolean shutdownRequested;
  private volatile boolean shutdownRequested;
  /* Internal LDAP connection handler state */
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2012 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -32,6 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.DebugLogLevel;
import java.io.IOException;
@@ -123,32 +124,37 @@
          }
        }
        try
        long sleepTime = session.getLastPublishTime() +
            heartbeatInterval - now;
        if (sleepTime <= 0)
        {
          long sleepTime = session.getLastPublishTime() +
              heartbeatInterval - now;
          if (sleepTime <= 0)
          {
            sleepTime = heartbeatInterval;
          }
          sleepTime = heartbeatInterval;
        }
          if (debugEnabled())
          {
            TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
          }
        if (debugEnabled())
        {
          TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
        }
          synchronized (shutdownLock)
        synchronized (shutdownLock)
        {
          if (!shutdown)
          {
            if (!shutdown)
            try
            {
              shutdownLock.wait(sleepTime);
            }
            catch (InterruptedException e)
            {
              // Server shutdown monitor may interrupt slow threads.
              if (debugEnabled())
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, e);
              }
              shutdown = true;
            }
          }
        }
        catch (InterruptedException e)
        {
          // Keep looping.
        }
      }
    }
    catch (IOException e)
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2012 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -34,6 +34,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.types.DebugLogLevel;
/**
 * This thread is in charge of periodically determining if the connected
@@ -100,21 +101,25 @@
    boolean interrupted = false;
    while (!shutdown && !interrupted)
    {
      try
      synchronized (shutdownLock)
      {
        synchronized (shutdownLock)
        if (!shutdown)
        {
          if (!shutdown)
          try
          {
            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
          }
          catch (InterruptedException e)
          {
            // Server shutdown monitor may interrupt slow threads.
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
            shutdown = true;
            break;
          }
        }
      } catch (InterruptedException ex)
      {
        TRACER.debugInfo("Status analyzer for dn " +
            replicationServerDomain.getBaseDn().toString() + " in RS " +
            replicationServerDomain.getReplicationServer().getServerId() +
            " has been interrupted while sleeping.");
      }
      // Go through each connected DS, get the number of pending changes we have
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2012 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -35,6 +35,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.TimeThread;
import java.io.IOException;
@@ -111,27 +112,32 @@
          session.publish(ctHeartbeatMsg);
        }
        try
        long sleepTime = session.getLastPublishTime() +
            heartbeatInterval - now;
        if (sleepTime <= 0)
        {
          long sleepTime = session.getLastPublishTime() +
              heartbeatInterval - now;
          if (sleepTime <= 0)
          {
            sleepTime = heartbeatInterval;
          }
          sleepTime = heartbeatInterval;
        }
          synchronized (shutdownLock)
        synchronized (shutdownLock)
        {
          if (!shutdown)
          {
            if (!shutdown)
            try
            {
              shutdownLock.wait(sleepTime);
            }
            catch (InterruptedException e)
            {
              // Server shutdown monitor may interrupt slow threads.
              if (debugEnabled())
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, e);
              }
              shutdown = true;
            }
          }
        }
        catch (InterruptedException e)
        {
          // Keep looping.
        }
      }
    }
    catch (IOException e)
opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2012 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -31,10 +31,10 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.api.DirectoryThread;
@@ -70,9 +70,10 @@
  /**
   * Set this to stop the thread.
   * Thread life-cycle state.
   */
  private volatile boolean shutdown = false;
  private final Object shutdownLock = new Object();
@@ -110,7 +111,11 @@
   */
  public void shutdown()
  {
    shutdown = true;
    synchronized (shutdownLock)
    {
      shutdown = true;
      shutdownLock.notifyAll();
    }
  }
  /**
@@ -119,27 +124,30 @@
  @Override
  public void run()
  {
    boolean gotOneFailure = false;
    if (debugEnabled())
    {
      TRACER.debugInfo(this + " is starting, expected interval is " +
                heartbeatInterval);
    }
    try
    {
      boolean gotOneFailure = false;
      while (!shutdown)
      {
        long now = System.currentTimeMillis();
        long lastReceiveTime = session.getLastReceiveTime();
        if (now > lastReceiveTime + heartbeatInterval)
        {
          if (gotOneFailure == true)
          if (gotOneFailure)
          {
            // Heartbeat is well overdue so the server is assumed to be dead.
            logError(WARN_HEARTBEAT_FAILURE.get(serverID,
                replicationServerID,
                session.getReadableRemoteAddress(), baseDN));
            session.close();
                replicationServerID, session.getReadableRemoteAddress(),
                baseDN));
            // Exit monitor and close session.
            shutdown = true;
            break;
          }
          else
@@ -151,13 +159,26 @@
        {
          gotOneFailure = false;
        }
        try
        // Sleep.
        synchronized (shutdownLock)
        {
          Thread.sleep(heartbeatInterval);
        }
        catch (InterruptedException e)
        {
          // That's OK.
          if (!shutdown)
          {
            try
            {
              shutdownLock.wait(heartbeatInterval);
            }
            catch (InterruptedException e)
            {
              // Server shutdown monitor may interrupt slow threads.
              if (debugEnabled())
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, e);
              }
              shutdown = true;
            }
          }
        }
      }
    }
@@ -165,9 +186,9 @@
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Heartbeat monitor is exiting." +
            stackTraceToSingleLineString(new Exception()));
        TRACER.debugInfo("Heartbeat monitor is exiting");
      }
      session.close();
    }
  }
}