mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.55.2011 296090552c8bb9ad83cfa819fbf0bd9d0ba8fac2
Fix OpenDJ-117: IllegalMonitorStateException during server shutdown
6 files modified
256 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 201 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 22 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -706,6 +706,7 @@
      Message message = Message.raw(
          "Protocol error: StartSessionMsg required." + msg + " received.");
      abortStart(message);
      return null;
    }
    // Process StartSessionMsg sent by remote DS
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -539,17 +539,20 @@
      // client wants to stop handshake (was just for handshake phase one for RS
      // choice). Return null to make the session be terminated.
      return null;
    } else if (!(msg instanceof StartECLSessionMsg))
    {
      Message message = Message.raw(
          "Protocol error: StartECLSessionMsg required." + msg + " received.");
      abortStart(message);
    }
    else if (!(msg instanceof StartECLSessionMsg))
    {
      Message message = Message
          .raw("Protocol error: StartECLSessionMsg required." + msg
              + " received.");
      abortStart(message);
      return null;
    }
    else
    {
    // Process StartSessionMsg sent by remote DS
    StartECLSessionMsg startECLSessionMsg = (StartECLSessionMsg) msg;
    return startECLSessionMsg;
      return (StartECLSessionMsg) msg;
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1064,22 +1064,25 @@
    if (!handler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
      {
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          if (!shutdown)
          {
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
            lock();
          }
        } catch (InterruptedException ex)
        catch (InterruptedException ex)
        {
          // Try doing job anyway...
          // We can't deal with this here, so re-interrupt thread so that it is
          // caught during subsequent IO.
          Thread.currentThread().interrupt();
          return;
        }
        }
      try
      {
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        {
@@ -1179,15 +1182,20 @@
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      }
      catch (InterruptedException ex)
      {
        // We can't deal with this here, so re-interrupt thread so that it is
        // caught during subsequent IO.
        Thread.currentThread().interrupt();
        return;
      }
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          lock();
        } catch (InterruptedException ex)
        {
          // Try doing job anyway...
        }
        if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
@@ -1779,28 +1787,35 @@
    return returnMsg;
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * topology directly connected to this RS. This includes information for:
   * - local RS
   * - all direct DSs
   * - all direct RSs
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   * topology directly connected to this RS. This includes information for: -
   * local RS - all direct DSs - all direct RSs
   *
   * @param sender
   *          The sender of this message.
   * @param destination
   *          The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if the current thread
   *         was interrupted while attempting to get the domain lock.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg monitorMsg = null;
    try {
    try
    {
      // Lock domain as we need to go through connected servers list
      lock();
    }
    catch (InterruptedException e)
    {
      return null;
    }
      monitorMsg = new MonitorMsg(sender, destination);
    try
    {
      MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      // Populate for each connected LDAP Server
      // from the states stored in the serverHandler.
@@ -1808,35 +1823,27 @@
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      {
        monitorMsg.setServerState(
          lsh.getServerId(),
          lsh.getServerState(),
          lsh.getApproxFirstMissingDate(),
        monitorMsg.setServerState(lsh.getServerId(),
            lsh.getServerState(), lsh.getApproxFirstMissingDate(),
          true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      {
        monitorMsg.setServerState(
          rsh.getServerId(),
          rsh.getServerState(),
          rsh.getApproxFirstMissingDate(),
        monitorMsg.setServerState(rsh.getServerId(),
            rsh.getServerState(), rsh.getApproxFirstMissingDate(),
          false);
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(this.getDbServerState());
    } catch(InterruptedException e)
      return monitorMsg;
    }
    finally
    {
      // At lock, too bad...
    } finally
    {
      if (hasLock())
        release();
    }
    return monitorMsg;
  }
  /**
@@ -2126,18 +2133,23 @@
          "In " + this +
          " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
          " for baseDn " + baseDn + ":\n" + genIdMsg);
    try
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
      {
        // Try doing job anyway...
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
      }
    try
    {
      long newGenId = genIdMsg.getGenerationId();
      if (newGenId != this.generationId)
@@ -2233,16 +2245,20 @@
    try
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
      {
        // Try doing job anyway...
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
      }
    try
    {
      ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
@@ -2258,7 +2274,6 @@
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          senderHandler.getServerId(), baseDn, newStatus.toString());
      logError(message);
    }
    catch(Exception e)
    {
@@ -2278,17 +2293,16 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
    StatusMachineEvent event)
  {
    try
  public boolean changeStatusFromStatusAnalyzer(
      DataServerHandler serverHandler, StatusMachineEvent event)
    {
    try
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
    {
      // We have been interrupted for dying, from stopStatusAnalyzer
      // to prevent deadlock in this situation:
@@ -2299,41 +2313,48 @@
      // waiting for analyzer thread death, a deadlock occurs. So we force
      // interruption of the status analyzer thread death after 2 seconds if
      // it has not finished (see StatusAnalyzer.waitForShutdown). This allows
      // to have the analyzer thread taking the domain lock only when the status
      // of a DS has to be changed. See more comments in run method of
      // to have the analyzer thread taking the domain lock only when the
      // status of a DS has to be changed. See more comments in run method of
      // StatusAnalyzer.
      if (debugEnabled())
        TRACER.debugInfo(
          "Status analyzer for domain " + baseDn + " has been interrupted when"
          + " trying to acquire domain lock for changing the status of DS " +
          serverHandler.getServerId());
        TRACER
            .debugInfo("Status analyzer for domain "
                + baseDn
                + " has been interrupted when"
                + " trying to acquire domain lock for changing the status"
                + " of DS "
                + serverHandler.getServerId());
      return true;
    }
    try
    {
    ServerStatus newStatus = ServerStatus.INVALID_STATUS;
    ServerStatus oldStatus = serverHandler.getStatus();
    try
    {
      newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
    } catch (IOException e)
        newStatus = serverHandler
            .changeStatusFromStatusAnalyzer(event);
      }
      catch (IOException e)
    {
      logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER.get(baseDn.
        toString(),
        logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
            .get(baseDn.toString(),
        Integer.toString(serverHandler.getServerId()),
        e.getMessage()));
    }
    if ( (newStatus == ServerStatus.INVALID_STATUS) ||
      (newStatus == oldStatus) )
      if ((newStatus == ServerStatus.INVALID_STATUS)
          || (newStatus == oldStatus))
    {
      // Change was impossible or already occurred (see StatusAnalyzer comments)
        // Change was impossible or already occurred (see StatusAnalyzer
        // comments)
      return false;
    }
    // Update every peers (RS/DS) with topology changes
    buildAndSendTopoInfoToDSs(serverHandler);
    buildAndSendTopoInfoToRSs();
    }
    catch(Exception e)
    {
@@ -2344,6 +2365,7 @@
    {
      release();
    }
    return false;
  }
@@ -2456,11 +2478,14 @@
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      if (!hasLock())
        lock();
    } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
    {
      // Try doing job anyway...
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
@@ -2502,7 +2527,6 @@
       * DS we have.
       */
      buildAndSendTopoInfoToDSs(null);
    }
    catch(Exception e)
    {
@@ -2867,7 +2891,7 @@
   * - when creating and sending a TopologyMsg
   * - when a DS status is changing (ChangeStatusMsg received or sent)...
   */
  private ReentrantLock lock = new ReentrantLock();
  private final ReentrantLock lock = new ReentrantLock();
  /**
   * This lock is used to protect the generationid variable.
@@ -3344,9 +3368,13 @@
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
    {
      // Try doing job anyway...
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
@@ -3356,7 +3384,8 @@
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : replicationServers.values())
        for (ReplicationServerHandler rsHandler : replicationServers
            .values())
        {
          try
          {
@@ -3365,12 +3394,14 @@
            {
              rsHandler.send(msg);
            }
          } catch (IOException e)
          }
          catch (IOException e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                "Replication Server " + replicationServer.getReplicationPort() +
                " " + baseDn + " " + replicationServer.getServerId()));
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                .get("Replication Server "
                    + replicationServer.getReplicationPort() + " "
                    + baseDn + " " + replicationServer.getServerId()));
            stopServer(rsHandler, false);
          }
        }
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -178,7 +178,6 @@
    try
    {
      //
      lockDomain(false); // no timeout
      // Send start
@@ -260,15 +259,11 @@
      logError(message);
      super.finalizeStart();
    }
    catch(IOException ioe)
    {
      // FIXME receive
    }
    // catch(DirectoryException de)
    //{ already logged
    //
    catch(Exception e)
    {
      // FIXME more detailed exceptions
@@ -528,7 +523,7 @@
            msg.getClass().getCanonicalName(),
            "TopologyMsg");
      }
      abortStart(message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    // Remote RS sent his topo msg
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -832,13 +832,20 @@
    return (!this.isDataServer());
  }
  /**
   * Lock the domain potentially with a timeout.
   * @param timedout The provided timeout.
   * @throws DirectoryException When an exception occurs.
   *
   * @param timedout
   *          The provided timeout.
   * @throws DirectoryException
   *           When an exception occurs.
   * @throws InterruptedException
   *           If the current thread was interrupted while waiting for the lock.
   */
  protected void lockDomain(boolean timedout)
  throws DirectoryException
    throws DirectoryException, InterruptedException
  {
    // The handshake phase must be done by blocking any access to structures
    // keeping info on connected servers, so that one can safely check for
@@ -859,8 +866,6 @@
    // If domain already exists, lock it until handshake is finished otherwise
    // it will be created and locked later in the method
    try
    {
      if (!timedout)
      {
        // !timedout
@@ -921,13 +926,6 @@
        }
      }
    }
    catch (InterruptedException e)
    {
      // Thread interrupted
      Message message = ERR_EXCEPTION_LOCKING_RS_DOMAIN.get(e.getMessage());
      logError(message);
    }
  }
  /**
   * Processes a routable message.
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -155,7 +155,7 @@
                StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
              if (interrupted)
              {
                // Finih job and let thread die
                // Finish job and let thread die
                TRACER.debugInfo("Status analyzer for dn " +
                  replicationServerDomain.getBaseDn().toString() +
                  " has been interrupted and will die. This is in RS " +
@@ -173,7 +173,7 @@
                StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
              if (interrupted)
              {
                // Finih job and let thread die
                // Finish job and let thread die
                TRACER.debugInfo("Status analyzer for dn " +
                  replicationServerDomain.getBaseDn().toString() +
                  " has been interrupted and will die. This is in RS " +