mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
09.43.2013 42bfb9888a5a85352e7316e8c47e6a67441522f0
Fixing thread leaks in Continuous Integration.

ReplicationServerDomain.java:
Fixed NPEs in stopStatusAnalyzer() and stopMonitoringPublisher().
In getEligibleCSN(), removed useless null checks.

ServerWriter.java:
Code cleanup.
2 files modified
63 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2456,7 +2456,7 @@
  private void stopStatusAnalyzer()
  {
    final StatusAnalyzer thread = statusAnalyzer.get();
    if (statusAnalyzer.compareAndSet(thread, null))
    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
    {
      thread.shutdown();
      thread.waitForShutdown();
@@ -2485,7 +2485,7 @@
  private void stopMonitoringPublisher()
  {
    final MonitoringPublisher thread = monitoringPublisher.get();
    if (monitoringPublisher.compareAndSet(thread, null))
    if (thread != null && monitoringPublisher.compareAndSet(thread, null))
    {
      thread.shutdown();
      thread.waitForShutdown();
@@ -2629,14 +2629,11 @@
        continue;
      }
      if (replicaNewestCSN != null
          && (eligibleCSN == null ||
              replicaNewestCSN.isNewerThan(eligibleCSN)))
      if (eligibleCSN == null || replicaNewestCSN.isNewerThan(eligibleCSN))
      {
        eligibleCSN = replicaNewestCSN;
      }
      if (heartbeatLastCSN != null
          && (eligibleCSN == null || heartbeatLastCSN.isNewerThan(eligibleCSN)))
      if (heartbeatLastCSN != null && heartbeatLastCSN.isNewerThan(eligibleCSN))
      {
        eligibleCSN = heartbeatLastCSN;
      }
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.net.SocketException;
import java.util.NoSuchElementException;
@@ -42,6 +37,10 @@
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a server writer, which is used to send changes to a
@@ -75,13 +74,9 @@
      ReplicationServerDomain replicationServerDomain)
  {
    // Session may be null for ECLServerWriter.
    super("Replication server RS("
        + handler.getReplicationServerId()
        + ") writing to "
        + handler.toString()
        + " at "
        + ((session != null) ? session.getReadableRemoteAddress()
            : "unknown"));
    super("Replication server RS(" + handler.getReplicationServerId()
        + ") writing to " + handler + " at "
        + (session != null ? session.getReadableRemoteAddress() : "unknown"));
    this.session = session;
    this.handler = handler;
@@ -96,11 +91,12 @@
  @Override
  public void run()
  {
    Message errMessage = null;
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " starting");
      TRACER.debugInfo(getName() + " starting");
    }
    Message errMessage = null;
    try
    {
      while (true)
@@ -129,8 +125,8 @@
           * mode (most of the time).
           */
          ServerStatus dsStatus = handler.getStatus();
          if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
            (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
              || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
          {
            long referenceGenerationId =
              replicationServerDomain.getGenerationId();
@@ -158,8 +154,9 @@
           */
          long referenceGenerationId =
            replicationServerDomain.getGenerationId();
          if ((referenceGenerationId != handler.getGenerationId()) ||
            (referenceGenerationId == -1) || (handler.getGenerationId() == -1))
          if (referenceGenerationId != handler.getGenerationId()
              || referenceGenerationId == -1
              || handler.getGenerationId() == -1)
          {
            logError(
                WARN_IGNORING_UPDATE_TO_RS.get(
@@ -174,21 +171,6 @@
          }
        }
        /*
        if (debugEnabled())
        {
          TRACER.debugInfo(
            "In " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
            ", writer to " + this.handler.getMonitorInstanceName() +
            " publishes msg=[" + update.toString() + "]"+
            " refgenId=" + referenceGenerationId +
            " isAssured=" + update.isAssured() +
            " server=" + handler.getServerId() +
            " generationId=" + handler.getGenerationId());
        }
        */
        // Publish the update to the remote server using a protocol version he
        // it supports
        session.publish(update);
@@ -241,7 +223,7 @@
       * An unexpected error happened.
       * Log an error and close the connection.
       */
      errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
      errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler +
                        " " +  stackTraceToSingleLineString(e));
      logError(errMessage);
    }
@@ -250,7 +232,7 @@
      replicationServerDomain.stopServer(handler, false);
      if (debugEnabled())
      {
        TRACER.debugInfo(this.getName() + " stopped " + errMessage);
        TRACER.debugInfo(getName() + " stopped " + errMessage);
      }
    }
  }