From 296090552c8bb9ad83cfa819fbf0bd9d0ba8fac2 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 18 Apr 2011 15:55:45 +0000
Subject: [PATCH] Fix OpenDJ-117: IllegalMonitorStateException during server shutdown

---
 opends/src/server/org/opends/server/replication/server/DataServerHandler.java        |    1 
 opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java |    7 
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java         |   37 ++--
 opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java           |    4 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java            |  132 ++++++++--------
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java  |  261 ++++++++++++++++++--------------
 6 files changed, 235 insertions(+), 207 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index a32519c..c0f67f9 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -706,6 +706,7 @@
       Message message = Message.raw(
           "Protocol error: StartSessionMsg required." + msg + " received.");
       abortStart(message);
+      return null;
     }
 
     // Process StartSessionMsg sent by remote DS
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index b0df1e3..624c058 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -486,13 +486,13 @@
       StartECLSessionMsg inStartECLSessionMsg =
         waitAndProcessStartSessionECLFromRemoteServer();
       if (inStartECLSessionMsg == null)
-        {
-          // client wants to properly close the connection (client sent a
-          // StopMsg)
-          logStopReceived();
-          abortStart(null);
-          return;
-        }
+      {
+        // client wants to properly close the connection (client sent a
+        // StopMsg)
+        logStopReceived();
+        abortStart(null);
+        return;
+      }
 
       logStartECLSessionHandshake(inStartECLSessionMsg);
 
@@ -539,17 +539,20 @@
       // client wants to stop handshake (was just for handshake phase one for RS
       // choice). Return null to make the session be terminated.
       return null;
-    } else if (!(msg instanceof StartECLSessionMsg))
-    {
-      Message message = Message.raw(
-          "Protocol error: StartECLSessionMsg required." + msg + " received.");
-      abortStart(message);
     }
-
-    // Process StartSessionMsg sent by remote DS
-    StartECLSessionMsg startECLSessionMsg = (StartECLSessionMsg) msg;
-
-    return startECLSessionMsg;
+    else if (!(msg instanceof StartECLSessionMsg))
+    {
+      Message message = Message
+          .raw("Protocol error: StartECLSessionMsg required." + msg
+              + " received.");
+      abortStart(message);
+      return null;
+    }
+    else
+    {
+      // Process StartSessionMsg sent by remote DS
+      return (StartECLSessionMsg) msg;
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d147ea4..64ac53c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1064,22 +1064,25 @@
     if (!handler.engageShutdown())
       // Only do this once (prevent other thread to enter here again)
     {
-      try
+      if (!shutdown)
       {
         try
         {
-
           // Acquire lock on domain (see more details in comment of start()
           // method of ServerHandler)
-          if (!shutdown)
-          {
-            lock();
-          }
-        } catch (InterruptedException ex)
-        {
-          // Try doing job anyway...
+          lock();
         }
+        catch (InterruptedException ex)
+        {
+          // We can't deal with this here, so re-interrupt thread so that it is
+          // caught during subsequent IO.
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
 
+      try
+      {
         // Stop useless monitoring publisher if no more RS or DS in domain
         if ( (directoryServers.size() + replicationServers.size() )== 1)
         {
@@ -1179,15 +1182,20 @@
     {
       try
       {
-        try
-        {
-          // Acquire lock on domain (see more details in comment of start()
-          // method of ServerHandler)
-          lock();
-        } catch (InterruptedException ex)
-        {
-          // Try doing job anyway...
-        }
+        // Acquire lock on domain (see more details in comment of start() method
+        // of ServerHandler)
+        lock();
+      }
+      catch (InterruptedException ex)
+      {
+        // We can't deal with this here, so re-interrupt thread so that it is
+        // caught during subsequent IO.
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      try
+      {
         if (otherHandlers.contains(handler))
         {
           unRegisterHandler(handler);
@@ -1779,28 +1787,35 @@
     return returnMsg;
   }
 
+
+
   /**
    * Creates a new monitor message including monitoring information for the
-   * topology directly connected to this RS. This includes information for:
-   * - local RS
-   * - all direct DSs
-   * - all direct RSs
-   * @param sender The sender of this message.
-   * @param destination The destination of this message.
-   * @return The newly created and filled MonitorMsg. Null if a problem occurred
-   * during message creation.
+   * topology directly connected to this RS. This includes information for: -
+   * local RS - all direct DSs - all direct RSs
+   *
+   * @param sender
+   *          The sender of this message.
+   * @param destination
+   *          The destination of this message.
+   * @return The newly created and filled MonitorMsg. Null if the current thread
+   *         was interrupted while attempting to get the domain lock.
    */
   public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
   {
-    MonitorMsg monitorMsg = null;
-
-    try {
-
+    try
+    {
       // Lock domain as we need to go through connected servers list
       lock();
+    }
+    catch (InterruptedException e)
+    {
+      return null;
+    }
 
-      monitorMsg = new MonitorMsg(sender, destination);
-
+    try
+    {
+      MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
 
       // Populate for each connected LDAP Server
       // from the states stored in the serverHandler.
@@ -1808,35 +1823,27 @@
       // - the older missing change
       for (DataServerHandler lsh : this.directoryServers.values())
       {
-        monitorMsg.setServerState(
-          lsh.getServerId(),
-          lsh.getServerState(),
-          lsh.getApproxFirstMissingDate(),
-          true);
+        monitorMsg.setServerState(lsh.getServerId(),
+            lsh.getServerState(), lsh.getApproxFirstMissingDate(),
+            true);
       }
 
       // Same for the connected RS
       for (ReplicationServerHandler rsh : this.replicationServers.values())
       {
-        monitorMsg.setServerState(
-          rsh.getServerId(),
-          rsh.getServerState(),
-          rsh.getApproxFirstMissingDate(),
-          false);
+        monitorMsg.setServerState(rsh.getServerId(),
+            rsh.getServerState(), rsh.getApproxFirstMissingDate(),
+            false);
       }
 
       // Populate the RS state in the msg from the DbState
       monitorMsg.setReplServerDbState(this.getDbServerState());
-    } catch(InterruptedException e)
-    {
-      // At lock, too bad...
-    } finally
-    {
-      if (hasLock())
-        release();
+      return monitorMsg;
     }
-
-    return monitorMsg;
+    finally
+    {
+      release();
+    }
   }
 
   /**
@@ -2126,18 +2133,23 @@
           "In " + this +
           " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
           " for baseDn " + baseDn + ":\n" + genIdMsg);
+
     try
     {
-      try
-      {
-        // Acquire lock on domain (see more details in comment of start() method
-        // of ServerHandler)
-        lock();
-      } catch (InterruptedException ex)
-      {
-        // Try doing job anyway...
-      }
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    }
+    catch (InterruptedException ex)
+    {
+      // We can't deal with this here, so re-interrupt thread so that it is
+      // caught during subsequent IO.
+      Thread.currentThread().interrupt();
+      return;
+    }
 
+    try
+    {
       long newGenId = genIdMsg.getGenerationId();
 
       if (newGenId != this.generationId)
@@ -2233,16 +2245,20 @@
 
     try
     {
-      try
-      {
-        // Acquire lock on domain (see more details in comment of start() method
-        // of ServerHandler)
-        lock();
-      } catch (InterruptedException ex)
-      {
-        // Try doing job anyway...
-      }
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    }
+    catch (InterruptedException ex)
+    {
+      // We can't deal with this here, so re-interrupt thread so that it is
+      // caught during subsequent IO.
+      Thread.currentThread().interrupt();
+      return;
+    }
 
+    try
+    {
       ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
       if (newStatus == ServerStatus.INVALID_STATUS)
       {
@@ -2258,7 +2274,6 @@
       Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
           senderHandler.getServerId(), baseDn, newStatus.toString());
       logError(message);
-
     }
     catch(Exception e)
     {
@@ -2278,17 +2293,16 @@
    * @param event The event to be used for new status computation
    * @return True if we have been interrupted (must stop), false otherwise
    */
-  public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
-    StatusMachineEvent event)
+  public boolean changeStatusFromStatusAnalyzer(
+      DataServerHandler serverHandler, StatusMachineEvent event)
   {
     try
     {
-    try
-    {
       // Acquire lock on domain (see more details in comment of start() method
       // of ServerHandler)
       lock();
-    } catch (InterruptedException ex)
+    }
+    catch (InterruptedException ex)
     {
       // We have been interrupted for dying, from stopStatusAnalyzer
       // to prevent deadlock in this situation:
@@ -2299,43 +2313,50 @@
       // waiting for analyzer thread death, a deadlock occurs. So we force
       // interruption of the status analyzer thread death after 2 seconds if
       // it has not finished (see StatusAnalyzer.waitForShutdown). This allows
-      // to have the analyzer thread taking the domain lock only when the status
-      // of a DS has to be changed. See more comments in run method of
+      // to have the analyzer thread taking the domain lock only when the
+      // status of a DS has to be changed. See more comments in run method of
       // StatusAnalyzer.
       if (debugEnabled())
-        TRACER.debugInfo(
-          "Status analyzer for domain " + baseDn + " has been interrupted when"
-          + " trying to acquire domain lock for changing the status of DS " +
-          serverHandler.getServerId());
+        TRACER
+            .debugInfo("Status analyzer for domain "
+                + baseDn
+                + " has been interrupted when"
+                + " trying to acquire domain lock for changing the status"
+                + " of DS "
+                + serverHandler.getServerId());
       return true;
     }
 
-    ServerStatus newStatus = ServerStatus.INVALID_STATUS;
-    ServerStatus oldStatus = serverHandler.getStatus();
     try
     {
-      newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
-    } catch (IOException e)
-    {
-      logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER.get(baseDn.
-        toString(),
-        Integer.toString(serverHandler.getServerId()),
-        e.getMessage()));
-    }
+      ServerStatus newStatus = ServerStatus.INVALID_STATUS;
+      ServerStatus oldStatus = serverHandler.getStatus();
+      try
+      {
+        newStatus = serverHandler
+            .changeStatusFromStatusAnalyzer(event);
+      }
+      catch (IOException e)
+      {
+        logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
+            .get(baseDn.toString(),
+                Integer.toString(serverHandler.getServerId()),
+                e.getMessage()));
+      }
 
-    if ( (newStatus == ServerStatus.INVALID_STATUS) ||
-      (newStatus == oldStatus) )
-    {
-      // Change was impossible or already occurred (see StatusAnalyzer comments)
-      return false;
-    }
+      if ((newStatus == ServerStatus.INVALID_STATUS)
+          || (newStatus == oldStatus))
+      {
+        // Change was impossible or already occurred (see StatusAnalyzer
+        // comments)
+        return false;
+      }
 
-    // Update every peers (RS/DS) with topology changes
-    buildAndSendTopoInfoToDSs(serverHandler);
-    buildAndSendTopoInfoToRSs();
-
+      // Update every peers (RS/DS) with topology changes
+      buildAndSendTopoInfoToDSs(serverHandler);
+      buildAndSendTopoInfoToRSs();
     }
-    catch(Exception e)
+    catch (Exception e)
     {
       logError(Message.raw(Category.SYNC, Severity.NOTICE,
           stackTraceToSingleLineString(e)));
@@ -2344,6 +2365,7 @@
     {
       release();
     }
+
     return false;
   }
 
@@ -2456,11 +2478,14 @@
     {
       // Acquire lock on domain (see more details in comment of start() method
       // of ServerHandler)
-      if (!hasLock())
-        lock();
-    } catch (InterruptedException ex)
+      lock();
+    }
+    catch (InterruptedException ex)
     {
-      // Try doing job anyway...
+      // We can't deal with this here, so re-interrupt thread so that it is
+      // caught during subsequent IO.
+      Thread.currentThread().interrupt();
+      return;
     }
 
     try
@@ -2502,7 +2527,6 @@
        * DS we have.
        */
       buildAndSendTopoInfoToDSs(null);
-
     }
     catch(Exception e)
     {
@@ -2867,7 +2891,7 @@
    * - when creating and sending a TopologyMsg
    * - when a DS status is changing (ChangeStatusMsg received or sent)...
    */
-  private ReentrantLock lock = new ReentrantLock();
+  private final ReentrantLock lock = new ReentrantLock();
 
   /**
    * This lock is used to protect the generationid variable.
@@ -3344,9 +3368,13 @@
       // Acquire lock on domain (see more details in comment of start() method
       // of ServerHandler)
       lock();
-    } catch (InterruptedException ex)
+    }
+    catch (InterruptedException ex)
     {
-      // Try doing job anyway...
+      // We can't deal with this here, so re-interrupt thread so that it is
+      // caught during subsequent IO.
+      Thread.currentThread().interrupt();
+      return;
     }
 
     try
@@ -3356,7 +3384,8 @@
       {
         // If we are the first replication server warned,
         // then forwards the message to the remote replication servers
-        for (ReplicationServerHandler rsHandler : replicationServers.values())
+        for (ReplicationServerHandler rsHandler : replicationServers
+            .values())
         {
           try
           {
@@ -3365,12 +3394,14 @@
             {
               rsHandler.send(msg);
             }
-          } catch (IOException e)
+          }
+          catch (IOException e)
           {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
-                "Replication Server " + replicationServer.getReplicationPort() +
-                " " + baseDn + " " + replicationServer.getServerId()));
+            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
+                .get("Replication Server "
+                    + replicationServer.getReplicationPort() + " "
+                    + baseDn + " " + replicationServer.getServerId()));
             stopServer(rsHandler, false);
           }
         }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 3c35331..9b0ac2b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -178,7 +178,6 @@
 
     try
     {
-      //
       lockDomain(false); // no timeout
 
       // Send start
@@ -260,15 +259,11 @@
       logError(message);
 
       super.finalizeStart();
-
     }
     catch(IOException ioe)
     {
       // FIXME receive
     }
-    // catch(DirectoryException de)
-    //{ already logged
-    //
     catch(Exception e)
     {
       // FIXME more detailed exceptions
@@ -528,7 +523,7 @@
             msg.getClass().getCanonicalName(),
             "TopologyMsg");
       }
-      abortStart(message);
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
 
     // Remote RS sent his topo msg
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index b00939a..c3e2b86 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -832,13 +832,20 @@
     return (!this.isDataServer());
   }
 
+
+
   /**
    * Lock the domain potentially with a timeout.
-   * @param timedout The provided timeout.
-   * @throws DirectoryException When an exception occurs.
+   *
+   * @param timedout
+   *          The provided timeout.
+   * @throws DirectoryException
+   *           When an exception occurs.
+   * @throws InterruptedException
+   *           If the current thread was interrupted while waiting for the lock.
    */
   protected void lockDomain(boolean timedout)
-  throws DirectoryException
+    throws DirectoryException, InterruptedException
   {
     // The handshake phase must be done by blocking any access to structures
     // keeping info on connected servers, so that one can safely check for
@@ -859,73 +866,64 @@
 
     // If domain already exists, lock it until handshake is finished otherwise
     // it will be created and locked later in the method
-    try
+    if (!timedout)
     {
-      if (!timedout)
-      {
-        // !timedout
-        if (!replicationServerDomain.hasLock())
-          replicationServerDomain.lock();
-      }
-      else
-      {
-        // timedout
-        /**
-         * Take the lock on the domain.
-         * WARNING: Here we try to acquire the lock with a timeout. This
-         * is for preventing a deadlock that may happen if there are cross
-         * connection attempts (for same domain) from this replication
-         * server and from a peer one:
-         * Here is the scenario:
-         * - RS1 connect thread takes the domain lock and starts
-         * connection to RS2
-         * - at the same time RS2 connect thread takes his domain lock and
-         * start connection to RS2
-         * - RS2 listen thread starts processing received
-         * ReplServerStartMsg from RS1 and wants to acquire the lock on
-         * the domain (here) but cannot as RS2 connect thread already has
-         * it
-         * - RS1 listen thread starts processing received
-         * ReplServerStartMsg from RS2 and wants to acquire the lock on
-         * the domain (here) but cannot as RS1 connect thread already has
-         * it
-         * => Deadlock: 4 threads are locked.
-         * So to prevent that in such situation, the listen threads here
-         * will both timeout trying to acquire the lock. The random time
-         * for the timeout should allow on connection attempt to be
-         * aborted whereas the other one should have time to finish in the
-         * same time.
-         * Warning: the minimum time (3s) should be big enough to allow
-         * normal situation connections to terminate. The added random
-         * time should represent a big enough range so that the chance to
-         * have one listen thread timing out a lot before the peer one is
-         * great. When the first listen thread times out, the remote
-         * connect thread should release the lock and allow the peer
-         * listen thread to take the lock it was waiting for and process
-         * the connection attempt.
-         */
-        Random random = new Random();
-        int randomTime = random.nextInt(6); // Random from 0 to 5
-        // Wait at least 3 seconds + (0 to 5 seconds)
-        long timeout = (long) (3000 + ( randomTime * 1000 ) );
-        boolean noTimeout = replicationServerDomain.tryLock(timeout);
-        if (!noTimeout)
-        {
-          // Timeout
-          Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
-              getServiceId(),
-              serverId,
-              session.getReadableRemoteAddress(),
-              replicationServerId);
-          throw new DirectoryException(ResultCode.OTHER, message);
-        }
-      }
+      // !timedout
+      if (!replicationServerDomain.hasLock())
+        replicationServerDomain.lock();
     }
-    catch (InterruptedException e)
+    else
     {
-      // Thread interrupted
-      Message message = ERR_EXCEPTION_LOCKING_RS_DOMAIN.get(e.getMessage());
-      logError(message);
+      // timedout
+      /**
+       * Take the lock on the domain.
+       * WARNING: Here we try to acquire the lock with a timeout. This
+       * is for preventing a deadlock that may happen if there are cross
+       * connection attempts (for same domain) from this replication
+       * server and from a peer one:
+       * Here is the scenario:
+       * - RS1 connect thread takes the domain lock and starts
+       * connection to RS2
+       * - at the same time RS2 connect thread takes his domain lock and
+       * start connection to RS2
+       * - RS2 listen thread starts processing received
+       * ReplServerStartMsg from RS1 and wants to acquire the lock on
+       * the domain (here) but cannot as RS2 connect thread already has
+       * it
+       * - RS1 listen thread starts processing received
+       * ReplServerStartMsg from RS2 and wants to acquire the lock on
+       * the domain (here) but cannot as RS1 connect thread already has
+       * it
+       * => Deadlock: 4 threads are locked.
+       * So to prevent that in such situation, the listen threads here
+       * will both timeout trying to acquire the lock. The random time
+       * for the timeout should allow on connection attempt to be
+       * aborted whereas the other one should have time to finish in the
+       * same time.
+       * Warning: the minimum time (3s) should be big enough to allow
+       * normal situation connections to terminate. The added random
+       * time should represent a big enough range so that the chance to
+       * have one listen thread timing out a lot before the peer one is
+       * great. When the first listen thread times out, the remote
+       * connect thread should release the lock and allow the peer
+       * listen thread to take the lock it was waiting for and process
+       * the connection attempt.
+       */
+      Random random = new Random();
+      int randomTime = random.nextInt(6); // Random from 0 to 5
+      // Wait at least 3 seconds + (0 to 5 seconds)
+      long timeout = (long) (3000 + ( randomTime * 1000 ) );
+      boolean noTimeout = replicationServerDomain.tryLock(timeout);
+      if (!noTimeout)
+      {
+        // Timeout
+        Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+            getServiceId(),
+            serverId,
+            session.getReadableRemoteAddress(),
+            replicationServerId);
+        throw new DirectoryException(ResultCode.OTHER, message);
+      }
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 111518d..05d97d0 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -155,7 +155,7 @@
                 StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
               if (interrupted)
               {
-                // Finih job and let thread die
+                // Finish job and let thread die
                 TRACER.debugInfo("Status analyzer for dn " +
                   replicationServerDomain.getBaseDn().toString() +
                   " has been interrupted and will die. This is in RS " +
@@ -173,7 +173,7 @@
                 StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
               if (interrupted)
               {
-                // Finih job and let thread die
+                // Finish job and let thread die
                 TRACER.debugInfo("Status analyzer for dn " +
                   replicationServerDomain.getBaseDn().toString() +
                   " has been interrupted and will die. This is in RS " +

--
Gitblit v1.10.0