mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
29.59.2012 905277605ce8651332e1d2eb0752b24c7a739ac8
Fix OPENDJ-457: Sleeping replication threads prevent server from shutting down

Improve synchronization during broker start/stop.
3 files modified
200 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 182 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -31,10 +31,10 @@
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -98,7 +98,7 @@
{
  private ReplicationServerListener replicationServerListener = null;
  private static Map<DN, LDAPReplicationDomain> domains =
    new HashMap<DN, LDAPReplicationDomain>() ;
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
  /**
   * The queue of received update messages, to be treated by the ReplayThread
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private volatile boolean shutdown = false;
  private final Object startStopLock = new Object();
  private volatile Collection<String> servers;
  private volatile boolean connected = false;
  private volatile String replicationServer = "Not connected";
@@ -235,9 +236,12 @@
   */
  public void start()
  {
    shutdown = false;
    this.rcvWindow = this.maxRcvWindow;
    this.connect();
    synchronized (startStopLock)
    {
      shutdown = false;
      this.rcvWindow = this.maxRcvWindow;
      this.connect();
    }
  }
  /**
@@ -247,21 +251,23 @@
   */
  public void start(Collection<String> servers)
  {
    /*
     * Open Socket to the ReplicationServer
     * Send the Start message
     */
    shutdown = false;
    this.servers = servers;
    if (servers.size() < 1)
    synchronized (startStopLock)
    {
      Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
      logError(message);
    }
      /*
       * Open Socket to the ReplicationServer Send the Start message
       */
      shutdown = false;
      this.servers = servers;
    this.rcvWindow = this.maxRcvWindow;
    this.connect();
      if (servers.size() < 1)
      {
        Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
        logError(message);
      }
      this.rcvWindow = this.maxRcvWindow;
      this.connect();
    }
  }
  /**
@@ -2115,7 +2121,7 @@
   */
  public void reStart(boolean infiniteTry)
  {
    reStart(this.session, infiniteTry);
    reStart(session, infiniteTry);
  }
  /**
@@ -2126,7 +2132,6 @@
   */
  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
  {
    if (failingSession != null)
    {
      failingSession.close();
@@ -2135,43 +2140,64 @@
    if (failingSession == session)
    {
      this.connected = false;
      connected = false;
      rsGroupId = (byte) -1;
      rsServerId = -1;
      rsServerUrl = null;
      session = null;
    }
    while (!this.connected && (!this.shutdown))
    while (true)
    {
      try
      // Synchronize inside the loop in order to allow shutdown.
      boolean needSleep = false;
      synchronized (startStopLock)
      {
        this.connect();
      } catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
          baseDn, e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        if (connected || shutdown)
        {
          break;
        }
        try
        {
          connect();
        }
        catch (Exception e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
              e.getLocalizedMessage()));
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
        }
        if (connected || !infiniteTry)
        {
          break;
        }
        needSleep = true;
      }
      if ((!connected) && (!infiniteTry))
        break;
      if ((!connected) && (!shutdown))
      if (needSleep)
      {
        try
        {
          Thread.sleep(500);
        } catch (InterruptedException e)
        }
        catch (InterruptedException e)
        {
          // ignore
        }
      }
    }
    if (debugEnabled())
      debugInfo(this +
          " end restart : connected=" + connected +
          " with RSid=" + this.getRsServerId() +
          " genid=" + this.generationID);
    {
      debugInfo(this + " end restart : connected=" + connected + " with RSid="
          + this.getRsServerId() + " genid=" + this.generationID);
    }
  }
  /**
@@ -2376,9 +2402,9 @@
      boolean reconnectOnFailure, boolean returnOnTopoChange)
    throws SocketTimeoutException
  {
    while (shutdown == false)
    while (!shutdown)
    {
      if ((reconnectOnFailure) && (!connected))
      if (reconnectOnFailure && !connected)
      {
        // infinite try to reconnect
        reStart(null, true);
@@ -2386,12 +2412,17 @@
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final ProtocolSession failingSession = session;
      final int replicationServerID = rsServerId;
      final ProtocolSession savedSession = session;
      if (savedSession == null)
      {
        // Must be shutting down.
        break;
      }
      final int replicationServerID = rsServerId;
      try
      {
        ReplicationMsg msg = session.receive();
        ReplicationMsg msg = savedSession.receive();
        if (msg instanceof UpdateMsg)
        {
          synchronized (this)
@@ -2403,7 +2434,8 @@
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          sendWindow.release(windowMsg.getNumAck());
        } else if (msg instanceof TopologyMsg)
        }
        else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg) msg;
          receiveTopo(topoMsg);
@@ -2417,20 +2449,22 @@
          if (returnOnTopoChange)
            return msg;
        } else if (msg instanceof StopMsg)
        }
        else if (msg instanceof StopMsg)
        {
          /*
           * RS performs a proper disconnection
           */
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
              .get(replicationServerID,
                  failingSession.getReadableRemoteAddress(),
                  serverId, baseDn);
                  savedSession.getReadableRemoteAddress(),
              serverId, baseDn);
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession, true);
        } else if (msg instanceof MonitorMsg)
          this.reStart(savedSession, true);
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
@@ -2490,14 +2524,14 @@
                {
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      failingSession.getReadableRemoteAddress(),
                      savedSession.getReadableRemoteAddress(),
                      baseDn);
                }
                else
                {
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      failingSession.getReadableRemoteAddress(),
                      savedSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(), baseDn);
                }
                logError(message);
@@ -2508,36 +2542,45 @@
              mustRunBestServerCheckingAlgorithm = 0;
            }
          }
        } else
        }
        else
        {
          return msg;
        }
      } catch (SocketTimeoutException e)
      }
      catch (SocketTimeoutException e)
      {
        throw e;
      } catch (Exception e)
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        if (shutdown == false)
        if (!shutdown)
        {
          if ((session == null) || (!session.closeInitiated()))
          final ProtocolSession tmpSession = session;
          if (tmpSession == null || !tmpSession.closeInitiated())
          {
            /*
             * We did not initiate the close on our side, log an error message.
             */
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
                .get(serverId, baseDn, replicationServerID,
                    failingSession.getReadableRemoteAddress());
                    savedSession.getReadableRemoteAddress());
            logError(message);
          }
          if (reconnectOnFailure)
            reStart(failingSession, true);
          {
            reStart(savedSession, true);
          }
          else
            break; // does not seem necessary to explicitely disconnect ..
          {
            break; // does not seem necessary to explicitly disconnect ..
          }
        }
      }
    } // while !shutdown
@@ -2614,17 +2657,20 @@
        " close the connection to replication server " + rsServerId + " for" +
        " domain " + baseDn);
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
    rsGroupId = (byte) -1;
    rsServerId = -1;
    rsServerUrl = null;
    if (session != null)
    synchronized (startStopLock)
    {
      session.close();
      shutdown = true;
      connected = false;
      stopRSHeartBeatMonitoring();
      stopChangeTimeHeartBeatPublishing();
      replicationServer = "stopped";
      rsGroupId = (byte) -1;
      rsServerId = -1;
      rsServerUrl = null;
      if (session != null)
      {
        session.close();
      }
    }
  }
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3080,20 +3080,20 @@
  {
    synchronized (sessionLock)
    {
      // Stop the listener thread
      if (listenerThread != null)
      {
        listenerThread.shutdown();
      }
      // Stop the broker first in order to prevent the listener from
      // reconnecting - see OPENDJ-457.
      if (broker != null)
      {
        broker.stop();
      }
      // Wait for the listener thread to stop
      // Stop the listener thread
      if (listenerThread != null)
      {
        listenerThread.shutdown();
        listenerThread.waitForShutdown();
        listenerThread = null;
      }
    }
  }