mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.55.2011 0435081919c5eb9737d5f9572cbba7e600f5fa99
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1064,22 +1064,25 @@
    if (!handler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
      if (!shutdown)
      {
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          if (!shutdown)
          {
            lock();
          }
        } catch (InterruptedException ex)
        {
          // Try doing job anyway...
          lock();
        }
        catch (InterruptedException ex)
        {
          // We can't deal with this here, so re-interrupt thread so that it is
          // caught during subsequent IO.
          Thread.currentThread().interrupt();
          return;
        }
      }
      try
      {
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        {
@@ -1179,15 +1182,20 @@
    {
      try
      {
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          lock();
        } catch (InterruptedException ex)
        {
          // Try doing job anyway...
        }
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      }
      catch (InterruptedException ex)
      {
        // We can't deal with this here, so re-interrupt thread so that it is
        // caught during subsequent IO.
        Thread.currentThread().interrupt();
        return;
      }
      try
      {
        if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
@@ -1779,28 +1787,35 @@
    return returnMsg;
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * topology directly connected to this RS. This includes information for:
   * - local RS
   * - all direct DSs
   * - all direct RSs
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   * topology directly connected to this RS. This includes information for: -
   * local RS - all direct DSs - all direct RSs
   *
   * @param sender
   *          The sender of this message.
   * @param destination
   *          The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if the current thread
   *         was interrupted while attempting to get the domain lock.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg monitorMsg = null;
    try {
    try
    {
      // Lock domain as we need to go through connected servers list
      lock();
    }
    catch (InterruptedException e)
    {
      return null;
    }
      monitorMsg = new MonitorMsg(sender, destination);
    try
    {
      MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      // Populate for each connected LDAP Server
      // from the states stored in the serverHandler.
@@ -1808,35 +1823,27 @@
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      {
        monitorMsg.setServerState(
          lsh.getServerId(),
          lsh.getServerState(),
          lsh.getApproxFirstMissingDate(),
          true);
        monitorMsg.setServerState(lsh.getServerId(),
            lsh.getServerState(), lsh.getApproxFirstMissingDate(),
            true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      {
        monitorMsg.setServerState(
          rsh.getServerId(),
          rsh.getServerState(),
          rsh.getApproxFirstMissingDate(),
          false);
        monitorMsg.setServerState(rsh.getServerId(),
            rsh.getServerState(), rsh.getApproxFirstMissingDate(),
            false);
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(this.getDbServerState());
    } catch(InterruptedException e)
    {
      // At lock, too bad...
    } finally
    {
      if (hasLock())
        release();
      return monitorMsg;
    }
    return monitorMsg;
    finally
    {
      release();
    }
  }
  /**
@@ -2126,18 +2133,23 @@
          "In " + this +
          " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
          " for baseDn " + baseDn + ":\n" + genIdMsg);
    try
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      } catch (InterruptedException ex)
      {
        // Try doing job anyway...
      }
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    }
    catch (InterruptedException ex)
    {
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
    {
      long newGenId = genIdMsg.getGenerationId();
      if (newGenId != this.generationId)
@@ -2233,16 +2245,20 @@
    try
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      } catch (InterruptedException ex)
      {
        // Try doing job anyway...
      }
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    }
    catch (InterruptedException ex)
    {
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
    {
      ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
@@ -2258,7 +2274,6 @@
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          senderHandler.getServerId(), baseDn, newStatus.toString());
      logError(message);
    }
    catch(Exception e)
    {
@@ -2278,17 +2293,16 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
    StatusMachineEvent event)
  public boolean changeStatusFromStatusAnalyzer(
      DataServerHandler serverHandler, StatusMachineEvent event)
  {
    try
    {
    try
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
    {
      // We have been interrupted for dying, from stopStatusAnalyzer
      // to prevent deadlock in this situation:
@@ -2299,43 +2313,50 @@
      // waiting for analyzer thread death, a deadlock occurs. So we force
      // interruption of the status analyzer thread death after 2 seconds if
      // it has not finished (see StatusAnalyzer.waitForShutdown). This allows
      // to have the analyzer thread taking the domain lock only when the status
      // of a DS has to be changed. See more comments in run method of
      // to have the analyzer thread taking the domain lock only when the
      // status of a DS has to be changed. See more comments in run method of
      // StatusAnalyzer.
      if (debugEnabled())
        TRACER.debugInfo(
          "Status analyzer for domain " + baseDn + " has been interrupted when"
          + " trying to acquire domain lock for changing the status of DS " +
          serverHandler.getServerId());
        TRACER
            .debugInfo("Status analyzer for domain "
                + baseDn
                + " has been interrupted when"
                + " trying to acquire domain lock for changing the status"
                + " of DS "
                + serverHandler.getServerId());
      return true;
    }
    ServerStatus newStatus = ServerStatus.INVALID_STATUS;
    ServerStatus oldStatus = serverHandler.getStatus();
    try
    {
      newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
    } catch (IOException e)
    {
      logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER.get(baseDn.
        toString(),
        Integer.toString(serverHandler.getServerId()),
        e.getMessage()));
    }
      ServerStatus newStatus = ServerStatus.INVALID_STATUS;
      ServerStatus oldStatus = serverHandler.getStatus();
      try
      {
        newStatus = serverHandler
            .changeStatusFromStatusAnalyzer(event);
      }
      catch (IOException e)
      {
        logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
            .get(baseDn.toString(),
                Integer.toString(serverHandler.getServerId()),
                e.getMessage()));
      }
    if ( (newStatus == ServerStatus.INVALID_STATUS) ||
      (newStatus == oldStatus) )
    {
      // Change was impossible or already occurred (see StatusAnalyzer comments)
      return false;
    }
      if ((newStatus == ServerStatus.INVALID_STATUS)
          || (newStatus == oldStatus))
      {
        // Change was impossible or already occurred (see StatusAnalyzer
        // comments)
        return false;
      }
    // Update every peers (RS/DS) with topology changes
    buildAndSendTopoInfoToDSs(serverHandler);
    buildAndSendTopoInfoToRSs();
      // Update every peers (RS/DS) with topology changes
      buildAndSendTopoInfoToDSs(serverHandler);
      buildAndSendTopoInfoToRSs();
    }
    catch(Exception e)
    catch (Exception e)
    {
      logError(Message.raw(Category.SYNC, Severity.NOTICE,
          stackTraceToSingleLineString(e)));
@@ -2344,6 +2365,7 @@
    {
      release();
    }
    return false;
  }
@@ -2456,11 +2478,14 @@
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      if (!hasLock())
        lock();
    } catch (InterruptedException ex)
      lock();
    }
    catch (InterruptedException ex)
    {
      // Try doing job anyway...
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
@@ -2502,7 +2527,6 @@
       * DS we have.
       */
      buildAndSendTopoInfoToDSs(null);
    }
    catch(Exception e)
    {
@@ -2867,7 +2891,7 @@
   * - when creating and sending a TopologyMsg
   * - when a DS status is changing (ChangeStatusMsg received or sent)...
   */
  private ReentrantLock lock = new ReentrantLock();
  private final ReentrantLock lock = new ReentrantLock();
  /**
   * This lock is used to protect the generationid variable.
@@ -3344,9 +3368,13 @@
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    }
    catch (InterruptedException ex)
    {
      // Try doing job anyway...
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
@@ -3356,7 +3384,8 @@
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : replicationServers.values())
        for (ReplicationServerHandler rsHandler : replicationServers
            .values())
        {
          try
          {
@@ -3365,12 +3394,14 @@
            {
              rsHandler.send(msg);
            }
          } catch (IOException e)
          }
          catch (IOException e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                "Replication Server " + replicationServer.getReplicationPort() +
                " " + baseDn + " " + replicationServer.getServerId()));
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                .get("Replication Server "
                    + replicationServer.getReplicationPort() + " "
                    + baseDn + " " + replicationServer.getServerId()));
            stopServer(rsHandler, false);
          }
        }