mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
16.22.2014 e5e4ea1dfa436ac42413a4d9b3b1279354b7cc3b
Fixing JEChangeNumberIndexDBTest random tests.
JE was throwing exception when the thread accessing it had been interrupted which happens frequently on single core machines.
The solution is to replace the use of Thread.sleep(long) + Thread.interrupt() with Object.wait(long) + Object.notify() on thread shutdown.



JEChangelogDB.java:
Replaces the use of Thread.sleep(long) + Thread.interrupt() with Object.wait(long) + Object.notify() on thread shutdown.
Created method jeFriendlySleep(long) to emulate Thread.sleep(long).

ReplicationServer.java:
Code cleanup.
Changed shutdown field from boolean to AtomicBoolean.
Added final keyword to fields.
Reduced visibility of waitConnections() from public to default.
In shutdown(), used StaticUtils.close().
2 files modified
159 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 106 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 53 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -30,13 +30,14 @@
import java.net.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*;
import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
@@ -55,6 +56,7 @@
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.StaticUtils;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ReplicationMessages.*;
@@ -89,24 +91,25 @@
      new HashMap<DN, ReplicationServerDomain>();
  private final ChangelogDB changelogDB;
  private volatile boolean shutdown = false;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  private final ReplSessionSecurity replSessionSecurity;
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  private static String eclWorkflowID =
  private static final String eclWorkflowID =
    "External Changelog Workflow ID";
  private ECLWorkflowElement eclwe;
  private AtomicReference<WorkflowImpl> eclWorkflowImpl =
  private final AtomicReference<WorkflowImpl> eclWorkflowImpl =
      new AtomicReference<WorkflowImpl>();
  /**
   * This is required for unit testing, so that we can keep track of all the
   * replication servers which are running in the VM.
   */
  private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>();
  private static final Set<Integer> localPorts =
      new CopyOnWriteArraySet<Integer>();
  // Monitors for synchronizing domain creation with the connect thread.
  private final Object domainTicketLock = new Object();
@@ -117,7 +120,7 @@
   * Holds the list of all replication servers instantiated in this VM.
   * This allows to perform clean up of the RS databases in unit tests.
   */
  private static List<ReplicationServer> allInstances =
  private static final List<ReplicationServer> allInstances =
    new ArrayList<ReplicationServer>();
  /**
@@ -184,21 +187,18 @@
   * ports from other replication servers or from LDAP servers
   * and spawn further thread responsible for handling those connections
   */
  void runListen()
  {
    Message listenMsg = NOTE_REPLICATION_SERVER_LISTENING.get(
    logError(NOTE_REPLICATION_SERVER_LISTENING.get(
        getServerId(),
        listenSocket.getInetAddress().getHostAddress(),
        listenSocket.getLocalPort());
    logError(listenMsg);
        listenSocket.getLocalPort()));
    while (!shutdown && !stopListen)
    while (!shutdown.get() && !stopListen)
    {
      // Wait on the replicationServer port.
      // Read incoming messages and create LDAP or ReplicationServer listener
      // and Publisher.
      try
      {
        Session session;
@@ -212,14 +212,18 @@
          session = replSessionSecurity.createServerSession(newSocket,
              timeoutMS);
          if (session == null) // Error, go back to accept
          {
            continue;
          }
        }
        catch (Exception e)
        {
          // If problems happen during the SSL handshake, it is necessary
          // to close the socket to free the associated resources.
          if (newSocket != null)
          {
            newSocket.close();
          }
          continue;
        }
@@ -264,7 +268,7 @@
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        if (!shutdown)
        if (!shutdown.get())
        {
          logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()));
        }
@@ -282,7 +286,7 @@
  {
    synchronized (connectThreadLock)
    {
      while (!shutdown)
      while (!shutdown.get())
      {
        HostPort localAddress = HostPort.localAddress(getReplicationPort());
        for (ReplicationServerDomain domain : getReplicationServerDomains())
@@ -359,8 +363,10 @@
    boolean sslEncryption = replSessionSecurity.isSslEncryption();
    if (debugEnabled())
    {
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " connects to "
          + remoteServerAddress);
    }
    Socket socket = new Socket();
    Session session = null;
@@ -378,7 +384,9 @@
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      close(session);
      close(socket);
    }
@@ -389,7 +397,7 @@
   */
  private void initialize()
  {
    shutdown = false;
    shutdown.set(false);
    try
    {
@@ -401,14 +409,18 @@
      // creates working threads: we must first connect, then start to listen.
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect thread");
      {
        TRACER.debugInfo("RS " + getMonitorInstanceName()
            + " creates connect thread");
      }
      connectThread = new ReplicationServerConnectThread(this);
      connectThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen thread");
      {
        TRACER.debugInfo("RS " + getMonitorInstanceName()
            + " creates listen thread");
      }
      listenThread = new ReplicationServerListenThread(this);
      listenThread.start();
@@ -423,8 +435,10 @@
      eclwe = new ECLWorkflowElement(this);
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
      {
        TRACER.debugInfo("RS " + getMonitorInstanceName()
            + " successfully initialized");
      }
    } catch (UnknownHostException e)
    {
      logError(ERR_UNKNOWN_HOSTNAME.get());
@@ -604,7 +618,7 @@
  /**
   * Waits for connections to this ReplicationServer.
   */
  public void waitConnections()
  void waitConnections()
  {
    // Acquire a domain ticket and wait for a complete cycle of the connect
    // thread.
@@ -626,7 +640,7 @@
    // Wait until the connect thread has processed next connect phase.
    synchronized (domainTicketLock)
    {
      while (myDomainTicket > domainTicket && !shutdown)
      while (myDomainTicket > domainTicket && !shutdown.get())
      {
        try
        {
@@ -649,10 +663,10 @@
  {
    localPorts.remove(getReplicationPort());
    if (shutdown)
    if (!shutdown.compareAndSet(false, true))
    {
      return;
    shutdown = true;
    }
    // shutdown the connect thread
    if (connectThread != null)
@@ -660,19 +674,8 @@
      connectThread.interrupt();
    }
    // shutdown the listener thread
    try
    {
      if (listenSocket != null)
      {
        listenSocket.close();
      }
    } catch (IOException e)
    {
      // replication Server service is closing anyway.
    }
    // shutdown the listen thread
    StaticUtils.close(listenSocket);
    if (listenThread != null)
    {
      listenThread.interrupt();
@@ -744,9 +747,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationServerCfg configuration)
@@ -779,7 +780,9 @@
      catch (ChangelogException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        resultCode = ResultCode.OPERATIONS_ERROR;
      }
    }
@@ -805,13 +808,17 @@
      catch (IOException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString()));
      }
      catch (InterruptedException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString()));
      }
    }
@@ -898,9 +905,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationChangeAcceptable(
      ReplicationServerCfg configuration, List<Message> unacceptableReasons)
@@ -917,10 +922,8 @@
   */
  public long getGenerationId(DN baseDN)
  {
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    if (rsd!=null)
      return rsd.getGenerationId();
    return -1;
    final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    return rsd != null ? rsd.getGenerationId() : -1;
  }
  /**
@@ -941,8 +944,9 @@
  public void remove()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
    }
    shutdown();
  }
@@ -1025,7 +1029,9 @@
    }
    if (serversToDisconnect.isEmpty())
    {
      return;
    }
    for (ReplicationServerDomain domain: getReplicationServerDomains())
    {
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -385,13 +385,11 @@
    if (indexer != null)
    {
      indexer.initiateShutdown();
      indexer.interrupt();
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
      purger.interrupt();
    }
    try
@@ -838,17 +836,14 @@
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              if (!isShutdownInitiated())
              {
                // ... or the change number index DB is empty,
                // wait for new changes to come in.
              // ... or the change number index DB is empty,
              // wait for new changes to come in.
                // Note we cannot sleep for as long as the purge delay
                // (3 days default), because we might receive late updates
                // that will have to be purged before the purge delay elapses.
                // This can particularly happen in case of network partitions.
                sleep(DEFAULT_SLEEP);
              }
              // Note we cannot sleep for as long as the purge delay
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              continue;
            }
          }
@@ -864,7 +859,7 @@
          latestPurgeDate = purgeTimestamp;
          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
@@ -882,6 +877,33 @@
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
@@ -900,7 +922,10 @@
    public void initiateShutdown()
    {
      super.initiateShutdown();
      this.interrupt(); // wake up the purger thread for faster shutdown
      synchronized (this)
      {
        notify(); // wake up the purger thread for faster shutdown
      }
    }
  }
}