mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.32.2013 d51eba690902925cd89024bef2800e2232123da6
Enforced ReplicationServerDomain responsibilities by increasing encapsulation.


ReplicationServer.java:
Added getConnectedRSUrls(), that aggregates code from ReplicationServerDomain.getChangelogs() and from runConnect().
In applyConfigurationChange(), moved code to ReplicationServerDomain for better encapsulation.

ReplicationServerDomain.java:
Renamed checkForDuplicateDS() into isAlreadyConnectedToDS().
Renamed checkForDuplicateRS() into isAlreadyConnectedToRS().
Moved getChangelogs() to ReplicationServer.
Made several methods private to reduce coupling and enforce class responsibilities.
Generalized the use of isRunningStatusAnalyzer() and isRunningMonitoringPublisher() to improved readability.
Inlined updateStatusAnalyzer() and updateMonitoringPublisher().
Moved code from ... to create updateDegradedStatusThreshold() and updateMonitoringPeriod().
In stopReplicationServers(), renamed parameter for increased readability.

DataServerHandler.java:
Inlined createStatusAnalyzer().

ServerHandler.java, ReplicationServerHandler.java:
Inlined createMonitoringPublisher().
In lockDomain(), removed useless else.
Code cleanup.

StatusAnalyzer.java:
Extracted isInterrupted().
Extracted getMessage() to make error messages more conherent.
6 files modified
709 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 19 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 69 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 255 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 33 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 234 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 99 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -229,14 +229,6 @@
    return newStatus;
  }
  private void createStatusAnalyzer()
  {
    if (!replicationServerDomain.isRunningStatusAnalyzer())
    {
      replicationServerDomain.startStatusAnalyzer();
    }
  }
  /**
   * Retrieves a set of attributes containing monitor data that should be
   * returned to the client if the corresponding monitor entry is requested.
@@ -457,8 +449,7 @@
      localGenerationId = replicationServerDomain.getGenerationId();
      oldGenerationId = localGenerationId;
      // Duplicate server ?
      if (!replicationServerDomain.checkForDuplicateDS(this))
      if (replicationServerDomain.isAlreadyConnectedToDS(this))
      {
        abortStart(null);
        return;
@@ -468,7 +459,6 @@
      {
        StartMsg outStartMsg = sendStartToRemote();
        // log
        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
        // The session initiator decides whether to use SSL.
@@ -508,11 +498,8 @@
        throw new DirectoryException(ResultCode.OTHER, null, null);
      }
      // Create the status analyzer for the domain if not already started
      createStatusAnalyzer();
      // Create the monitoring publisher for the domain if not already started
      createMonitoringPublisher();
      replicationServerDomain.startStatusAnalyzer();
      replicationServerDomain.startMonitoringPublisher();
      registerIntoDomain();
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -399,11 +399,7 @@
        for (ReplicationServerDomain domain : getReplicationServerDomains())
        {
          // Create a normalized set of server URLs.
          final Set<String> connectedReplServers = new HashSet<String>();
          for (String url : domain.getChangelogs())
          {
            connectedReplServers.add(normalizeServerURL(url));
          }
          final Set<String> connectedRSUrls = getConnectedRSUrls(domain);
          /*
           * check that all replication server in the config are in the
@@ -440,7 +436,7 @@
            // Don't connect to a server if it is already connected.
            final String normalizedServerURL = normalizeServerURL(aServerURL);
            if (connectedReplServers.contains(normalizedServerURL))
            if (connectedRSUrls.contains(normalizedServerURL))
            {
              continue;
            }
@@ -472,6 +468,16 @@
    }
  }
  private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
  {
    Set<String> results = new LinkedHashSet<String>();
    for (ReplicationServerHandler handler : domain.getConnectedRSs().values())
    {
      results.add(normalizeServerURL(handler.getServerAddressURL()));
    }
    return results;
  }
  /**
   * Establish a connection to the server with the address and port.
   *
@@ -1039,59 +1045,23 @@
    // Update threshold value for status analyzers (stop them if requested
    // value is 0)
    if (degradedStatusThreshold != configuration
        .getDegradedStatusThreshold())
    if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
    {
      int oldThresholdValue = degradedStatusThreshold;
      degradedStatusThreshold = configuration
          .getDegradedStatusThreshold();
      degradedStatusThreshold = configuration.getDegradedStatusThreshold();
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        if (degradedStatusThreshold == 0)
        {
          // Requested to stop analyzers
          domain.stopStatusAnalyzer();
        }
        else if (domain.isRunningStatusAnalyzer())
        {
          // Update the threshold value for this running analyzer
          domain.updateStatusAnalyzer(degradedStatusThreshold);
        }
        else if (oldThresholdValue == 0)
        {
          // Requested to start analyzers with provided threshold value
          if (domain.getConnectedDSs().size() > 0)
            domain.startStatusAnalyzer();
        }
        domain.updateDegradedStatusThreshold(degradedStatusThreshold);
      }
    }
    // Update period value for monitoring publishers (stop them if requested
    // value is 0)
    if (monitoringPublisherPeriod != configuration
        .getMonitoringPeriod())
    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
    {
      long oldMonitoringPeriod = monitoringPublisherPeriod;
      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        if (monitoringPublisherPeriod == 0L)
        {
          // Requested to stop monitoring publishers
          domain.stopMonitoringPublisher();
        }
        else if (domain.isRunningMonitoringPublisher())
        {
          // Update the threshold value for this running monitoring publisher
          domain.updateMonitoringPublisher(monitoringPublisherPeriod);
        }
        else if (oldMonitoringPeriod == 0L)
        {
          // Requested to start monitoring publishers with provided period value
          if ((domain.getConnectedDSs().size() > 0)
              || (domain.getConnectedRSs().size() > 0))
            domain.startMonitoringPublisher();
        }
        domain.updateMonitoringPeriod(monitoringPublisherPeriod);
      }
    }
@@ -1126,7 +1096,7 @@
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
  /*
  /**
   * Try and set a sensible URL for this replication server. Since we are
   * listening on all addresses there are a couple of potential candidates: 1) a
   * matching server url in the replication server's configuration, 2) hostname
@@ -1666,8 +1636,7 @@
    }
    if (debugEnabled())
      TRACER.debugInfo("In " + this +
        " getEligibleCN() ends with " +
      TRACER.debugInfo("In " + this + " getEligibleCN() ends with " +
        " the following domainEligibleCN for each domain :" + debugLog +
        " thus CrossDomainEligibleCN=" + eligibleCN +
        "  ts=" + new Date(eligibleCN.getTime()).toString());
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -940,14 +940,14 @@
  /**
   * Stop operations with a list of replication servers.
   *
   * @param replServers the replication servers for which
   * we want to stop operations
   * @param replServerURLs
   *          the replication servers URLs for which we want to stop operations
   */
  public void stopReplicationServers(Collection<String> replServers)
  public void stopReplicationServers(Collection<String> replServerURLs)
  {
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
      if (replServerURLs.contains(handler.getServerAddressURL()))
      {
        stopServer(handler, false);
      }
@@ -976,12 +976,13 @@
  }
  /**
   * Checks that a DS is not connected with same id.
   * Checks whether it is already connected to a DS with same id.
   *
   * @param handler the DS we want to check
   * @return true if this is not a duplicate server
   * @param handler
   *          the DS we want to check
   * @return true if this DS is already connected to the current server
   */
  public boolean checkForDuplicateDS(DataServerHandler handler)
  public boolean isAlreadyConnectedToDS(DataServerHandler handler)
  {
    if (connectedDSs.containsKey(handler.getServerId()))
    {
@@ -991,9 +992,9 @@
          connectedDSs.get(handler.getServerId()).toString(),
          handler.toString(), handler.getServerId());
      logError(message);
      return false;
      return true;
    }
    return true;
    return false;
  }
  /**
@@ -1005,6 +1006,7 @@
   */
  public void stopServer(ServerHandler handler, boolean shutdown)
  {
    // TODO JNR merge with stopServer(MessageHandler)
    if (debugEnabled())
    {
      TRACER.debugInfo("In "
@@ -1055,22 +1057,9 @@
          stopMonitoringPublisher();
        }
        if (handler.isReplicationServer())
        if (connectedRSs.containsKey(handler.getServerId()))
        {
          if (connectedRSs.containsKey(handler.getServerId()))
          {
            unregisterServerHandler(handler);
            handler.shutdown();
            // Check if generation id has to be reset
            mayResetGenerationId();
            if (!shutdown)
            {
              // Warn our DSs that a RS or DS has quit (does not use this
              // handler as already removed from list)
              buildAndSendTopoInfoToDSs(null);
            }
          }
          unregisterServerHandler(handler, shutdown, false);
        } else if (connectedDSs.containsKey(handler.getServerId()))
        {
          // If this is the last DS for the domain,
@@ -1086,25 +1075,10 @@
            }
            stopStatusAnalyzer();
          }
          unregisterServerHandler(handler);
          handler.shutdown();
          // Check if generation id has to be reset
          mayResetGenerationId();
          if (!shutdown)
          {
            // Update the remote replication servers with our list
            // of connected LDAP servers
            buildAndSendTopoInfoToRSs();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
          }
          unregisterServerHandler(handler, shutdown, true);
        } else if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
          handler.shutdown();
          unregisterOtherHandler(handler);
        }
      }
      catch(Exception e)
@@ -1122,12 +1096,41 @@
    }
  }
  private void unregisterOtherHandler(MessageHandler handler)
  {
    unRegisterHandler(handler);
    handler.shutdown();
  }
  private void unregisterServerHandler(ServerHandler handler, boolean shutdown,
      boolean isDirectoryServer)
  {
    unregisterServerHandler(handler);
    handler.shutdown();
    // Check if generation id has to be reset
    mayResetGenerationId();
    if (!shutdown)
    {
      if (isDirectoryServer)
      {
        // Update the remote replication servers with our list
        // of connected LDAP servers
        buildAndSendTopoInfoToRSs();
      }
      // Warn our DSs that a RS or DS has quit (does not use this
      // handler as already removed from list)
      buildAndSendTopoInfoToDSs(null);
    }
  }
  /**
   * Stop the handler.
   * @param handler The handler to stop.
   */
  public void stopServer(MessageHandler handler)
  {
    // TODO JNR merge with stopServer(ServerHandler, boolean)
    if (debugEnabled())
    {
      TRACER.debugInfo("In "
@@ -1163,8 +1166,7 @@
      {
        if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
          handler.shutdown();
          unregisterOtherHandler(handler);
        }
      }
      catch(Exception e)
@@ -1269,39 +1271,40 @@
  }
  /**
   * Checks that a remote RS is not already connected to this hosting RS.
   * @param handler The handler for the remote RS.
   * Checks whether a remote RS is already connected to this hosting RS.
   *
   * @param handler
   *          The handler for the remote RS.
   * @return flag specifying whether the remote RS is already connected.
   * @throws DirectoryException when a problem occurs.
   * @throws DirectoryException
   *           when a problem occurs.
   */
  public boolean checkForDuplicateRS(ReplicationServerHandler handler)
  throws DirectoryException
  public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler)
      throws DirectoryException
  {
    ReplicationServerHandler oldHandler =
      connectedRSs.get(handler.getServerId());
    if (oldHandler != null)
        connectedRSs.get(handler.getServerId());
    if (oldHandler == null)
    {
      if (oldHandler.getServerAddressURL().equals(
        handler.getServerAddressURL()))
      {
        // this is the same server, this means that our ServerStart messages
        // have been sent at about the same time and 2 connections
        // have been established.
        // Silently drop this connection.
        return false;
      }
      else
      {
        // looks like two replication servers have the same serverId
        // log an error message and drop this connection.
        Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
          localReplicationServer.getMonitorInstanceName(), oldHandler.
          getServerAddressURL(), handler.getServerAddressURL(),
          handler.getServerId());
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      return false;
    }
    return true;
    if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL()))
    {
      // this is the same server, this means that our ServerStart messages
      // have been sent at about the same time and 2 connections
      // have been established.
      // Silently drop this connection.
      return true;
    }
    // looks like two replication servers have the same serverId
    // log an error message and drop this connection.
    Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
        localReplicationServer.getMonitorInstanceName(),
        oldHandler.getServerAddressURL(), handler.getServerAddressURL(),
        handler.getServerId());
    throw new DirectoryException(ResultCode.OTHER, message);
  }
  /**
@@ -1327,21 +1330,6 @@
  }
  /**
   * Return a Set of String containing the lists of Replication servers
   * connected to this server.
   * @return the set of connected servers
   */
  public Set<String> getChangelogs()
  {
    Set<String> results = new LinkedHashSet<String>();
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      results.add(handler.getServerAddressURL());
    }
    return results;
  }
  /**
   * Return a set containing the server that produced update and known by
   * this replicationServer from all over the topology,
   * whatever directly connected of connected to another RS.
@@ -2861,11 +2849,11 @@
  }
  /**
   * Starts the status analyzer for the domain.
   * Starts the status analyzer for the domain if not already started.
   */
  public void startStatusAnalyzer()
  {
    if (statusAnalyzer == null)
    if (!isRunningStatusAnalyzer())
    {
      int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
@@ -2880,9 +2868,9 @@
  /**
   * Stops the status analyzer for the domain.
   */
  public void stopStatusAnalyzer()
  private void stopStatusAnalyzer()
  {
    if (statusAnalyzer != null)
    if (isRunningStatusAnalyzer())
    {
      statusAnalyzer.shutdown();
      statusAnalyzer.waitForShutdown();
@@ -2894,32 +2882,19 @@
   * Tests if the status analyzer for this domain is running.
   * @return True if the status analyzer is running, false otherwise.
   */
  public boolean isRunningStatusAnalyzer()
  private boolean isRunningStatusAnalyzer()
  {
    return statusAnalyzer != null;
  }
  /**
   * Update the status analyzer with the new threshold value.
   * @param degradedStatusThreshold The new threshold value.
   */
  public void updateStatusAnalyzer(int degradedStatusThreshold)
  {
    if (statusAnalyzer != null)
    {
      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
    }
  }
  /**
   * Starts the monitoring publisher for the domain.
   * Starts the monitoring publisher for the domain if not already started.
   */
  public void startMonitoringPublisher()
  {
    if (monitoringPublisher == null)
    if (!isRunningMonitoringPublisher())
    {
      long period =
        localReplicationServer.getMonitoringPublisherPeriod();
      long period = localReplicationServer.getMonitoringPublisherPeriod();
      if (period > 0) // 0 means no monitoring publisher
      {
        monitoringPublisher = new MonitoringPublisher(this, period);
@@ -2931,9 +2906,9 @@
  /**
   * Stops the monitoring publisher for the domain.
   */
  public void stopMonitoringPublisher()
  private void stopMonitoringPublisher()
  {
    if (monitoringPublisher != null)
    if (isRunningMonitoringPublisher())
    {
      monitoringPublisher.shutdown();
      monitoringPublisher.waitForShutdown();
@@ -2945,24 +2920,12 @@
   * Tests if the monitoring publisher for this domain is running.
   * @return True if the monitoring publisher is running, false otherwise.
   */
  public boolean isRunningMonitoringPublisher()
  private boolean isRunningMonitoringPublisher()
  {
    return monitoringPublisher != null;
  }
  /**
   * Update the monitoring publisher with the new period value.
   * @param period The new period value.
   */
  public void updateMonitoringPublisher(long period)
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.setPeriod(period);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -3384,4 +3347,52 @@
  {
    return this.localReplicationServer.getServerId();
  }
  /**
   * Update the status analyzer with the new threshold value.
   *
   * @param degradedStatusThreshold
   *          The new threshold value.
   */
  void updateDegradedStatusThreshold(int degradedStatusThreshold)
  {
    if (degradedStatusThreshold == 0)
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
    }
    else if (isRunningStatusAnalyzer())
    {
      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (getConnectedDSs().size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
    }
  }
  /**
   * Update the monitoring publisher with the new period value.
   *
   * @param period
   *          The new period value.
   */
  void updateMonitoringPeriod(long period)
  {
    if (period == 0)
    {
      // Requested to stop monitoring publishers
      stopMonitoringPublisher();
    }
    else if (isRunningMonitoringPublisher())
    {
      monitoringPublisher.setPeriod(period);
    }
    else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
    {
      // Requested to start monitoring publishers with provided period value
      startMonitoringPublisher();
    }
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -160,7 +160,6 @@
    {
      lockDomain(false); // no timeout
      // Send start
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
      // Wait answer
@@ -174,22 +173,19 @@
          // Remote replication server is probably shutting down or simultaneous
          // cross-connect detected.
          abortStart(null);
          return;
        }
        else
        {
          Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
              .getClass().getCanonicalName(), "ReplServerStartMsg");
          abortStart(message);
          return;
        }
        return;
      }
      // Process hello from remote.
      processStartFromRemote((ReplServerStartMsg)msg);
      processStartFromRemote((ReplServerStartMsg) msg);
      // Duplicate server ?
      if (!replicationServerDomain.checkForDuplicateRS(this))
      if (replicationServerDomain.isAlreadyConnectedToRS(this))
      {
        // Simultaneous cross connect.
        abortStart(null);
@@ -207,10 +203,9 @@
            generationId, false);
      }
      // Log
      logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg);
      // Until here session is encrypted then it depends on the negociation
      // Until here session is encrypted then it depends on the negotiation
      // The session initiator decides whether to use SSL.
      if (!this.sslEncryption)
        session.stopEncryption();
@@ -239,8 +234,7 @@
        logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
        // Create the monitoring publisher for the domain if not already started
        createMonitoringPublisher();
        replicationServerDomain.startMonitoringPublisher();
        /*
        FIXME: i think this should be done for all protocol version !!
@@ -292,7 +286,6 @@
    }
    finally
    {
      // Release domain
      if (replicationServerDomain != null &&
          replicationServerDomain.hasLock())
        replicationServerDomain.release();
@@ -316,8 +309,7 @@
      // lock with timeout
      lockDomain(true);
      // Duplicate server ?
      if (!replicationServerDomain.checkForDuplicateRS(this))
      if (replicationServerDomain.isAlreadyConnectedToRS(this))
      {
        abortStart(null);
        return;
@@ -326,7 +318,6 @@
      this.localGenerationId = replicationServerDomain.getGenerationId();
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
      // log
      logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
      /*
@@ -358,7 +349,6 @@
            .createTopologyMsgForRS();
        sendTopoInfo(outTopoMsg);
        // log
        logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
      }
      else
@@ -390,9 +380,7 @@
        */
      }
      // Create the monitoring publisher for the domain if not already started
      createMonitoringPublisher();
      replicationServerDomain.startMonitoringPublisher();
      registerIntoDomain();
@@ -616,9 +604,7 @@
  public void shutdown()
  {
    super.shutdown();
    /*
     * Stop the remote LSHandler
     */
    // Stop the remote LSHandler
    synchronized (remoteDirectoryServers)
    {
      for (LightweightServerHandler lsh : remoteDirectoryServers.values())
@@ -755,7 +741,7 @@
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(md.getMissingChangesRS(serverId))));
    /* get the Server State */
    // get the Server State
    AttributeBuilder builder = new AttributeBuilder("server-state");
    ServerState state = md.getRSStates(serverId);
    if (state != null)
@@ -769,6 +755,7 @@
    return attributes;
  }
  /**
   * {@inheritDoc}
   */
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@
      if (debugEnabled())
        TRACER.debugInfo("In " +
          ((handler != null) ? handler.toString() : "Replication Server") +
          " closing session with err=" +
          providedMsg.toString());
          " closing session with err=" + providedMsg);
      logError(providedMsg);
    }
@@ -125,7 +124,7 @@
   */
  protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  /**
  // Number of updates received from the server in assured safe data mode.
   * Number of updates received from the server in assured safe data mode.
   */
  protected int assuredSdReceivedUpdates = 0;
  /**
@@ -169,7 +168,7 @@
  /**
   * The initial size of the sending window.
   */
  int sendWindowSize;
  protected int sendWindowSize;
  /**
   * remote generation id.
   */
@@ -185,7 +184,7 @@
  /**
   * Group id of this remote server.
   */
  protected byte groupId = (byte) -1;
  protected byte groupId = -1;
  /**
   * The SSL encryption after the negotiation with the peer.
   */
@@ -254,8 +253,7 @@
      closeSession(localSession, reason, this);
    }
    if ((replicationServerDomain != null) &&
        replicationServerDomain.hasLock())
    if (replicationServerDomain != null && replicationServerDomain.hasLock())
      replicationServerDomain.release();
    // If generation id of domain was changed, set it back to old value
@@ -263,10 +261,9 @@
    // peer server and the last topo message sent may have failed being
    // sent: in that case retrieve old value of generation id for
    // replication server domain
    if (oldGenerationId != -100)
    if (oldGenerationId != -100 && replicationServerDomain != null)
    {
      if (replicationServerDomain!=null)
        replicationServerDomain.changeGenerationId(oldGenerationId, false);
      replicationServerDomain.changeGenerationId(oldGenerationId, false);
    }
  }
@@ -304,7 +301,6 @@
  @Override
  public boolean engageShutdown()
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
@@ -340,13 +336,11 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, this,
          replicationServerDomain);
      writer = new ServerWriter(session, this, replicationServerDomain);
      reader = new ServerReader(session, this);
      session.setName("Replication server RS("
          + this.getReplicationServerId()
          + ") session thread to " + this.toString() + " at "
      session.setName("Replication server RS(" + getReplicationServerId()
          + ") session thread to " + this + " at "
          + session.getReadableRemoteAddress());
      session.start();
      try
@@ -366,9 +360,8 @@
      // Create a thread to send heartbeat messages.
      if (heartbeatInterval > 0)
      {
        String threadName = "Replication server RS("
            + this.getReplicationServerId()
            + ") heartbeat publisher to " + this.toString() + " at "
        String threadName = "Replication server RS(" + getReplicationServerId()
            + ") heartbeat publisher to " + this + " at "
            + session.getReadableRemoteAddress();
        heartbeatThread = new HeartbeatThread(threadName, session,
            heartbeatInterval / 3);
@@ -788,7 +781,7 @@
   */
  public boolean isReplicationServer()
  {
    return (!this.isDataServer());
    return !this.isDataServer();
  }
@@ -827,62 +820,58 @@
    // it will be created and locked later in the method
    if (!timedout)
    {
      // !timedout
      if (!replicationServerDomain.hasLock())
        replicationServerDomain.lock();
      return;
    }
    else
    /**
     * Take the lock on the domain.
     * WARNING: Here we try to acquire the lock with a timeout. This
     * is for preventing a deadlock that may happen if there are cross
     * connection attempts (for same domain) from this replication
     * server and from a peer one:
     * Here is the scenario:
     * - RS1 connect thread takes the domain lock and starts
     * connection to RS2
     * - at the same time RS2 connect thread takes his domain lock and
     * start connection to RS2
     * - RS2 listen thread starts processing received
     * ReplServerStartMsg from RS1 and wants to acquire the lock on
     * the domain (here) but cannot as RS2 connect thread already has
     * it
     * - RS1 listen thread starts processing received
     * ReplServerStartMsg from RS2 and wants to acquire the lock on
     * the domain (here) but cannot as RS1 connect thread already has
     * it
     * => Deadlock: 4 threads are locked.
     * So to prevent that in such situation, the listen threads here
     * will both timeout trying to acquire the lock. The random time
     * for the timeout should allow on connection attempt to be
     * aborted whereas the other one should have time to finish in the
     * same time.
     * Warning: the minimum time (3s) should be big enough to allow
     * normal situation connections to terminate. The added random
     * time should represent a big enough range so that the chance to
     * have one listen thread timing out a lot before the peer one is
     * great. When the first listen thread times out, the remote
     * connect thread should release the lock and allow the peer
     * listen thread to take the lock it was waiting for and process
     * the connection attempt.
     */
    Random random = new Random();
    int randomTime = random.nextInt(6); // Random from 0 to 5
    // Wait at least 3 seconds + (0 to 5 seconds)
    long timeout = 3000 + (randomTime * 1000);
    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
    if (!lockAcquired)
    {
      // timedout
      /**
       * Take the lock on the domain.
       * WARNING: Here we try to acquire the lock with a timeout. This
       * is for preventing a deadlock that may happen if there are cross
       * connection attempts (for same domain) from this replication
       * server and from a peer one:
       * Here is the scenario:
       * - RS1 connect thread takes the domain lock and starts
       * connection to RS2
       * - at the same time RS2 connect thread takes his domain lock and
       * start connection to RS2
       * - RS2 listen thread starts processing received
       * ReplServerStartMsg from RS1 and wants to acquire the lock on
       * the domain (here) but cannot as RS2 connect thread already has
       * it
       * - RS1 listen thread starts processing received
       * ReplServerStartMsg from RS2 and wants to acquire the lock on
       * the domain (here) but cannot as RS1 connect thread already has
       * it
       * => Deadlock: 4 threads are locked.
       * So to prevent that in such situation, the listen threads here
       * will both timeout trying to acquire the lock. The random time
       * for the timeout should allow on connection attempt to be
       * aborted whereas the other one should have time to finish in the
       * same time.
       * Warning: the minimum time (3s) should be big enough to allow
       * normal situation connections to terminate. The added random
       * time should represent a big enough range so that the chance to
       * have one listen thread timing out a lot before the peer one is
       * great. When the first listen thread times out, the remote
       * connect thread should release the lock and allow the peer
       * listen thread to take the lock it was waiting for and process
       * the connection attempt.
       */
      Random random = new Random();
      int randomTime = random.nextInt(6); // Random from 0 to 5
      // Wait at least 3 seconds + (0 to 5 seconds)
      long timeout = 3000 + (randomTime * 1000);
      boolean noTimeout = replicationServerDomain.tryLock(timeout);
      if (!noTimeout)
      {
        // Timeout
        Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
            getBaseDN(),
            serverId,
            session.getReadableRemoteAddress(),
            getReplicationServerId());
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
          getBaseDN(),
          serverId,
          session.getReadableRemoteAddress(),
          getReplicationServerId());
      throw new DirectoryException(ResultCode.OTHER, message);
    }
  }
@@ -1011,9 +1000,7 @@
      session.close();
    }
    /*
     * Stop the heartbeat thread.
     */
    // Stop the heartbeat thread.
    if (heartbeatThread != null)
    {
      heartbeatThread.shutdown();
@@ -1028,12 +1015,11 @@
     */
    try
    {
      if ((writer != null) && (!(Thread.currentThread().equals(writer))))
      if (writer != null && !Thread.currentThread().equals(writer))
      {
        writer.join(SHUTDOWN_JOIN_TIMEOUT);
      }
      if ((reader != null) && (!(Thread.currentThread().equals(reader))))
      if (reader != null && !Thread.currentThread().equals(reader))
      {
        reader.join(SHUTDOWN_JOIN_TIMEOUT);
      }
@@ -1068,7 +1054,7 @@
      {
        // loop until not interrupted
      }
    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
    } while ((interrupted || !acquired) && !shutdownWriter);
    if (msg != null)
    {
      incrementOutCount();
@@ -1078,10 +1064,9 @@
        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
        {
          incrementAssuredSrSentUpdates();
        } else
        } else if (!isDataServer())
        {
          if (!isDataServer())
            incrementAssuredSdSentUpdates();
          incrementAssuredSdSentUpdates();
        }
      }
    }
@@ -1094,22 +1079,10 @@
   */
  public RSInfo toRSInfo()
  {
    return new RSInfo(serverId, serverURL, generationId, groupId,
      weight);
    return new RSInfo(serverId, serverURL, generationId, groupId, weight);
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  protected void createMonitoringPublisher()
  {
    if (!replicationServerDomain.isRunningMonitoringPublisher())
    {
      replicationServerDomain.startMonitoringPublisher();
    }
  }
  /**
   * Update the send window size based on the credit specified in the
   * given window message.
   *
@@ -1132,11 +1105,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
        this.replicationServer.getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
        "\nAND REPLIED:\n" + outStartMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
          + "\nAND REPLIED:\n" + outStartMsg);
    }
  }
@@ -1151,12 +1123,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
        this.replicationServer.getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE SENT("+ this +
        "):\n" + outStartMsg.toString()+
        "\nAND RECEIVED:\n" + inStartMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
          + inStartMsg);
    }
  }
@@ -1171,11 +1141,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
          + outTopoMsg);
    }
  }
@@ -1190,11 +1159,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
          "\nAND RECEIVED:\n" + inTopoMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
          + inTopoMsg);
    }
  }
@@ -1209,11 +1177,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
          + "\nAND REPLIED:\n" + outTopoMsg);
    }
  }
@@ -1224,10 +1191,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
    }
  }
@@ -1240,11 +1206,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          inStartECLSessionMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
    }
  }
@@ -1264,10 +1228,9 @@
   */
  public long getReferenceGenId()
  {
    long refgenid = -1;
    if (replicationServerDomain!=null)
      refgenid = replicationServerDomain.getGenerationId();
    return refgenid;
    if (replicationServerDomain != null)
      return replicationServerDomain.getGenerationId();
    return -1;
  }
  /**
@@ -1285,8 +1248,7 @@
   * @param update the update message received.
   * @throws IOException when it occurs.
   */
  public void put(UpdateMsg update)
  throws IOException
  public void put(UpdateMsg update) throws IOException
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.put(update, this);
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -29,11 +29,12 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.types.DebugLogLevel;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
/**
 * This thread is in charge of periodically determining if the connected
@@ -85,7 +86,6 @@
  }
  /**
   * Run method for the StatusAnalyzer.
   * Analyzes if servers are late or not, and change their status accordingly.
   */
  @Override
@@ -93,13 +93,11 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Directory server status analyzer starting for dn " +
        replicationServerDomain.getBaseDn());
      TRACER.debugInfo(
          getMessage("Directory server status analyzer starting."));
    }
    final int localRsId = replicationServerDomain.getLocalRSServerId();
    boolean interrupted = false;
    while (!shutdown && !interrupted)
    while (!shutdown)
    {
      synchronized (shutdownLock)
      {
@@ -126,22 +124,21 @@
      // for it and change status accordingly if threshold value is
      // crossed/uncrossed
      for (DataServerHandler serverHandler :
        replicationServerDomain.getConnectedDSs(). values())
        replicationServerDomain.getConnectedDSs().values())
      {
        // Get number of pending changes for this server
        int nChanges = serverHandler.getRcvMsgQueueSize();
        if (debugEnabled())
        {
          TRACER.debugInfo("Status analyzer for dn "
              + replicationServerDomain.getBaseDn() + " DS "
          TRACER.debugInfo(getMessage("Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue. This is in RS " + localRsId);
              + " message(s) in writer queue."));
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // we are in a status that do not allows that, this will be noticed by
        // the changeStatusFromStatusAnalyzer method. This allows to take the
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (degradedStatusThreshold > 0)
          // Threshold value = 0 means no status analyzer (no degrading system)
@@ -151,39 +148,18 @@
        {
          if (nChanges >= degradedStatusThreshold)
          {
            if (serverHandler.getStatus() == ServerStatus.NORMAL_STATUS)
            if (serverHandler.getStatus() == NORMAL_STATUS
                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
            {
              interrupted =
                replicationServerDomain.changeStatusFromStatusAnalyzer(
                serverHandler,
                StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
              if (interrupted)
              {
                // Finish job and let thread die
                TRACER.debugInfo("Status analyzer for dn "
                    + replicationServerDomain.getBaseDn()
                    + " has been interrupted and will die. This is in RS "
                    + localRsId);
                break;
              }
              break;
            }
          } else
          }
          else
          {
            if (serverHandler.getStatus() == ServerStatus.DEGRADED_STATUS)
            if (serverHandler.getStatus() == DEGRADED_STATUS
                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
            {
              interrupted =
                replicationServerDomain.changeStatusFromStatusAnalyzer(
                serverHandler,
                StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
              if (interrupted)
              {
                // Finish job and let thread die
                TRACER.debugInfo("Status analyzer for dn "
                    + replicationServerDomain.getBaseDn()
                    + " has been interrupted and will die. This is in RS "
                    + localRsId);
                break;
              }
              break;
            }
          }
        }
@@ -191,9 +167,28 @@
    }
    done = true;
    TRACER.debugInfo("Status analyzer for dn "
        + replicationServerDomain.getBaseDn() + " is terminated."
        + " This is in RS " + localRsId);
    TRACER.debugInfo(getMessage("Status analyzer is terminated."));
  }
  private String getMessage(String message)
  {
    return "In RS " + replicationServerDomain.getLocalRSServerId()
        + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
        + message;
  }
  private boolean isInterrupted(DataServerHandler serverHandler,
      StatusMachineEvent event)
  {
    if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler,
        event))
    {
      // Finish job and let thread die
      TRACER.debugInfo(
          getMessage("Status analyzer has been interrupted and will die."));
      return true;
    }
    return false;
  }
  /**
@@ -208,9 +203,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("Shutting down status analyzer for dn "
            + replicationServerDomain.getBaseDn()
            + " in RS " + replicationServerDomain.getLocalRSServerId());
        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
      }
    }
  }
@@ -231,9 +224,7 @@
        n++;
        if (n >= FACTOR)
        {
          TRACER.debugInfo("Interrupting status analyzer for dn " +
              replicationServerDomain.getBaseDn() + " in RS " +
              replicationServerDomain.getLocalRSServerId());
          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
          interrupt();
        }
      }
@@ -251,9 +242,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Directory server status analyzer for dn " +
        replicationServerDomain.getBaseDn() + " changing threshold value to " +
        degradedStatusThreshold);
      TRACER.debugInfo(getMessage(
          "Directory server status analyzer changing threshold value to "
              + degradedStatusThreshold));
    }
    this.degradedStatusThreshold = degradedStatusThreshold;