From e5e4ea1dfa436ac42413a4d9b3b1279354b7cc3b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 16 May 2014 08:22:41 +0000
Subject: [PATCH] Fixing JEChangeNumberIndexDBTest random tests. JE was throwing exception when the thread accessing it had been interrupted which happens frequently on single core machines. The solution is to replace the use of Thread.sleep(long) + Thread.interrupt() with Object.wait(long) + Object.notify() on thread shutdown.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  106 ++++++++++++++++++++++++++++-------------------------
 1 files changed, 56 insertions(+), 50 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index f1af61f..96cf81f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -30,13 +30,14 @@
 import java.net.*;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
 import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*;
 import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
@@ -55,6 +56,7 @@
 import org.opends.server.replication.server.changelog.je.JEChangelogDB;
 import org.opends.server.types.*;
 import org.opends.server.util.ServerConstants;
+import org.opends.server.util.StaticUtils;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -89,24 +91,25 @@
       new HashMap<DN, ReplicationServerDomain>();
 
   private final ChangelogDB changelogDB;
-  private volatile boolean shutdown = false;
+  private final AtomicBoolean shutdown = new AtomicBoolean();
   private boolean stopListen = false;
-  private ReplSessionSecurity replSessionSecurity;
+  private final ReplSessionSecurity replSessionSecurity;
 
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
-  private static String eclWorkflowID =
+  private static final String eclWorkflowID =
     "External Changelog Workflow ID";
   private ECLWorkflowElement eclwe;
-  private AtomicReference<WorkflowImpl> eclWorkflowImpl =
+  private final AtomicReference<WorkflowImpl> eclWorkflowImpl =
       new AtomicReference<WorkflowImpl>();
 
   /**
    * This is required for unit testing, so that we can keep track of all the
    * replication servers which are running in the VM.
    */
-  private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>();
+  private static final Set<Integer> localPorts =
+      new CopyOnWriteArraySet<Integer>();
 
   // Monitors for synchronizing domain creation with the connect thread.
   private final Object domainTicketLock = new Object();
@@ -117,7 +120,7 @@
    * Holds the list of all replication servers instantiated in this VM.
    * This allows to perform clean up of the RS databases in unit tests.
    */
-  private static List<ReplicationServer> allInstances =
+  private static final List<ReplicationServer> allInstances =
     new ArrayList<ReplicationServer>();
 
   /**
@@ -184,21 +187,18 @@
    * ports from other replication servers or from LDAP servers
    * and spawn further thread responsible for handling those connections
    */
-
   void runListen()
   {
-    Message listenMsg = NOTE_REPLICATION_SERVER_LISTENING.get(
+    logError(NOTE_REPLICATION_SERVER_LISTENING.get(
         getServerId(),
         listenSocket.getInetAddress().getHostAddress(),
-        listenSocket.getLocalPort());
-    logError(listenMsg);
+        listenSocket.getLocalPort()));
 
-    while (!shutdown && !stopListen)
+    while (!shutdown.get() && !stopListen)
     {
       // Wait on the replicationServer port.
       // Read incoming messages and create LDAP or ReplicationServer listener
       // and Publisher.
-
       try
       {
         Session session;
@@ -212,14 +212,18 @@
           session = replSessionSecurity.createServerSession(newSocket,
               timeoutMS);
           if (session == null) // Error, go back to accept
+          {
             continue;
+          }
         }
         catch (Exception e)
         {
           // If problems happen during the SSL handshake, it is necessary
           // to close the socket to free the associated resources.
           if (newSocket != null)
+          {
             newSocket.close();
+          }
           continue;
         }
 
@@ -264,7 +268,7 @@
         {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
-        if (!shutdown)
+        if (!shutdown.get())
         {
           logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()));
         }
@@ -282,7 +286,7 @@
   {
     synchronized (connectThreadLock)
     {
-      while (!shutdown)
+      while (!shutdown.get())
       {
         HostPort localAddress = HostPort.localAddress(getReplicationPort());
         for (ReplicationServerDomain domain : getReplicationServerDomains())
@@ -359,8 +363,10 @@
     boolean sslEncryption = replSessionSecurity.isSslEncryption();
 
     if (debugEnabled())
+    {
       TRACER.debugInfo("RS " + getMonitorInstanceName() + " connects to "
           + remoteServerAddress);
+    }
 
     Socket socket = new Socket();
     Session session = null;
@@ -378,7 +384,9 @@
     catch (Exception e)
     {
       if (debugEnabled())
+      {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
       close(session);
       close(socket);
     }
@@ -389,7 +397,7 @@
    */
   private void initialize()
   {
-    shutdown = false;
+    shutdown.set(false);
 
     try
     {
@@ -401,14 +409,18 @@
 
       // creates working threads: we must first connect, then start to listen.
       if (debugEnabled())
-        TRACER.debugInfo("RS " +getMonitorInstanceName()+
-            " creates connect thread");
+      {
+        TRACER.debugInfo("RS " + getMonitorInstanceName()
+            + " creates connect thread");
+      }
       connectThread = new ReplicationServerConnectThread(this);
       connectThread.start();
 
       if (debugEnabled())
-        TRACER.debugInfo("RS " +getMonitorInstanceName()+
-            " creates listen thread");
+      {
+        TRACER.debugInfo("RS " + getMonitorInstanceName()
+            + " creates listen thread");
+      }
 
       listenThread = new ReplicationServerListenThread(this);
       listenThread.start();
@@ -423,8 +435,10 @@
       eclwe = new ECLWorkflowElement(this);
 
       if (debugEnabled())
-        TRACER.debugInfo("RS " +getMonitorInstanceName()+
-            " successfully initialized");
+      {
+        TRACER.debugInfo("RS " + getMonitorInstanceName()
+            + " successfully initialized");
+      }
     } catch (UnknownHostException e)
     {
       logError(ERR_UNKNOWN_HOSTNAME.get());
@@ -604,7 +618,7 @@
   /**
    * Waits for connections to this ReplicationServer.
    */
-  public void waitConnections()
+  void waitConnections()
   {
     // Acquire a domain ticket and wait for a complete cycle of the connect
     // thread.
@@ -626,7 +640,7 @@
     // Wait until the connect thread has processed next connect phase.
     synchronized (domainTicketLock)
     {
-      while (myDomainTicket > domainTicket && !shutdown)
+      while (myDomainTicket > domainTicket && !shutdown.get())
       {
         try
         {
@@ -649,10 +663,10 @@
   {
     localPorts.remove(getReplicationPort());
 
-    if (shutdown)
+    if (!shutdown.compareAndSet(false, true))
+    {
       return;
-
-    shutdown = true;
+    }
 
     // shutdown the connect thread
     if (connectThread != null)
@@ -660,19 +674,8 @@
       connectThread.interrupt();
     }
 
-    // shutdown the listener thread
-    try
-    {
-      if (listenSocket != null)
-      {
-        listenSocket.close();
-      }
-    } catch (IOException e)
-    {
-      // replication Server service is closing anyway.
-    }
-
     // shutdown the listen thread
+    StaticUtils.close(listenSocket);
     if (listenThread != null)
     {
       listenThread.interrupt();
@@ -744,9 +747,7 @@
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public ConfigChangeResult applyConfigurationChange(
       ReplicationServerCfg configuration)
@@ -779,7 +780,9 @@
       catch (ChangelogException e)
       {
         if (debugEnabled())
+        {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
         resultCode = ResultCode.OPERATIONS_ERROR;
       }
     }
@@ -805,13 +808,17 @@
       catch (IOException e)
       {
         if (debugEnabled())
+        {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
         logError(ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString()));
       }
       catch (InterruptedException e)
       {
         if (debugEnabled())
+        {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
         logError(ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString()));
       }
     }
@@ -898,9 +905,7 @@
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public boolean isConfigurationChangeAcceptable(
       ReplicationServerCfg configuration, List<Message> unacceptableReasons)
@@ -917,10 +922,8 @@
    */
   public long getGenerationId(DN baseDN)
   {
-    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
-    if (rsd!=null)
-      return rsd.getGenerationId();
-    return -1;
+    final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
+    return rsd != null ? rsd.getGenerationId() : -1;
   }
 
   /**
@@ -941,8 +944,9 @@
   public void remove()
   {
     if (debugEnabled())
+    {
       TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
-
+    }
     shutdown();
   }
 
@@ -1025,7 +1029,9 @@
     }
 
     if (serversToDisconnect.isEmpty())
+    {
       return;
+    }
 
     for (ReplicationServerDomain domain: getReplicationServerDomains())
     {

--
Gitblit v1.10.0