From 296090552c8bb9ad83cfa819fbf0bd9d0ba8fac2 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 18 Apr 2011 15:55:45 +0000
Subject: [PATCH] Fix OpenDJ-117: IllegalMonitorStateException during server shutdown
---
opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 1
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 7
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 37 ++--
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 4
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 132 ++++++++--------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 261 ++++++++++++++++++--------------
6 files changed, 235 insertions(+), 207 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index a32519c..c0f67f9 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -706,6 +706,7 @@
Message message = Message.raw(
"Protocol error: StartSessionMsg required." + msg + " received.");
abortStart(message);
+ return null;
}
// Process StartSessionMsg sent by remote DS
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index b0df1e3..624c058 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -486,13 +486,13 @@
StartECLSessionMsg inStartECLSessionMsg =
waitAndProcessStartSessionECLFromRemoteServer();
if (inStartECLSessionMsg == null)
- {
- // client wants to properly close the connection (client sent a
- // StopMsg)
- logStopReceived();
- abortStart(null);
- return;
- }
+ {
+ // client wants to properly close the connection (client sent a
+ // StopMsg)
+ logStopReceived();
+ abortStart(null);
+ return;
+ }
logStartECLSessionHandshake(inStartECLSessionMsg);
@@ -539,17 +539,20 @@
// client wants to stop handshake (was just for handshake phase one for RS
// choice). Return null to make the session be terminated.
return null;
- } else if (!(msg instanceof StartECLSessionMsg))
- {
- Message message = Message.raw(
- "Protocol error: StartECLSessionMsg required." + msg + " received.");
- abortStart(message);
}
-
- // Process StartSessionMsg sent by remote DS
- StartECLSessionMsg startECLSessionMsg = (StartECLSessionMsg) msg;
-
- return startECLSessionMsg;
+ else if (!(msg instanceof StartECLSessionMsg))
+ {
+ Message message = Message
+ .raw("Protocol error: StartECLSessionMsg required." + msg
+ + " received.");
+ abortStart(message);
+ return null;
+ }
+ else
+ {
+ // Process StartSessionMsg sent by remote DS
+ return (StartECLSessionMsg) msg;
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d147ea4..64ac53c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1064,22 +1064,25 @@
if (!handler.engageShutdown())
// Only do this once (prevent other thread to enter here again)
{
- try
+ if (!shutdown)
{
try
{
-
// Acquire lock on domain (see more details in comment of start()
// method of ServerHandler)
- if (!shutdown)
- {
- lock();
- }
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
+ lock();
}
+ catch (InterruptedException ex)
+ {
+ // We can't deal with this here, so re-interrupt thread so that it is
+ // caught during subsequent IO.
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ try
+ {
// Stop useless monitoring publisher if no more RS or DS in domain
if ( (directoryServers.size() + replicationServers.size() )== 1)
{
@@ -1179,15 +1182,20 @@
{
try
{
- try
- {
- // Acquire lock on domain (see more details in comment of start()
- // method of ServerHandler)
- lock();
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
- }
+ // Acquire lock on domain (see more details in comment of start() method
+ // of ServerHandler)
+ lock();
+ }
+ catch (InterruptedException ex)
+ {
+ // We can't deal with this here, so re-interrupt thread so that it is
+ // caught during subsequent IO.
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ try
+ {
if (otherHandlers.contains(handler))
{
unRegisterHandler(handler);
@@ -1779,28 +1787,35 @@
return returnMsg;
}
+
+
/**
* Creates a new monitor message including monitoring information for the
- * topology directly connected to this RS. This includes information for:
- * - local RS
- * - all direct DSs
- * - all direct RSs
- * @param sender The sender of this message.
- * @param destination The destination of this message.
- * @return The newly created and filled MonitorMsg. Null if a problem occurred
- * during message creation.
+ * topology directly connected to this RS. This includes information for: -
+ * local RS - all direct DSs - all direct RSs
+ *
+ * @param sender
+ * The sender of this message.
+ * @param destination
+ * The destination of this message.
+ * @return The newly created and filled MonitorMsg. Null if the current thread
+ * was interrupted while attempting to get the domain lock.
*/
public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
{
- MonitorMsg monitorMsg = null;
-
- try {
-
+ try
+ {
// Lock domain as we need to go through connected servers list
lock();
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
- monitorMsg = new MonitorMsg(sender, destination);
-
+ try
+ {
+ MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
// Populate for each connected LDAP Server
// from the states stored in the serverHandler.
@@ -1808,35 +1823,27 @@
// - the older missing change
for (DataServerHandler lsh : this.directoryServers.values())
{
- monitorMsg.setServerState(
- lsh.getServerId(),
- lsh.getServerState(),
- lsh.getApproxFirstMissingDate(),
- true);
+ monitorMsg.setServerState(lsh.getServerId(),
+ lsh.getServerState(), lsh.getApproxFirstMissingDate(),
+ true);
}
// Same for the connected RS
for (ReplicationServerHandler rsh : this.replicationServers.values())
{
- monitorMsg.setServerState(
- rsh.getServerId(),
- rsh.getServerState(),
- rsh.getApproxFirstMissingDate(),
- false);
+ monitorMsg.setServerState(rsh.getServerId(),
+ rsh.getServerState(), rsh.getApproxFirstMissingDate(),
+ false);
}
// Populate the RS state in the msg from the DbState
monitorMsg.setReplServerDbState(this.getDbServerState());
- } catch(InterruptedException e)
- {
- // At lock, too bad...
- } finally
- {
- if (hasLock())
- release();
+ return monitorMsg;
}
-
- return monitorMsg;
+ finally
+ {
+ release();
+ }
}
/**
@@ -2126,18 +2133,23 @@
"In " + this +
" Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
" for baseDn " + baseDn + ":\n" + genIdMsg);
+
try
{
- try
- {
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
- }
+ // Acquire lock on domain (see more details in comment of start() method
+ // of ServerHandler)
+ lock();
+ }
+ catch (InterruptedException ex)
+ {
+ // We can't deal with this here, so re-interrupt thread so that it is
+ // caught during subsequent IO.
+ Thread.currentThread().interrupt();
+ return;
+ }
+ try
+ {
long newGenId = genIdMsg.getGenerationId();
if (newGenId != this.generationId)
@@ -2233,16 +2245,20 @@
try
{
- try
- {
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
- }
+ // Acquire lock on domain (see more details in comment of start() method
+ // of ServerHandler)
+ lock();
+ }
+ catch (InterruptedException ex)
+ {
+ // We can't deal with this here, so re-interrupt thread so that it is
+ // caught during subsequent IO.
+ Thread.currentThread().interrupt();
+ return;
+ }
+ try
+ {
ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
if (newStatus == ServerStatus.INVALID_STATUS)
{
@@ -2258,7 +2274,6 @@
Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
senderHandler.getServerId(), baseDn, newStatus.toString());
logError(message);
-
}
catch(Exception e)
{
@@ -2278,17 +2293,16 @@
* @param event The event to be used for new status computation
* @return True if we have been interrupted (must stop), false otherwise
*/
- public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
- StatusMachineEvent event)
+ public boolean changeStatusFromStatusAnalyzer(
+ DataServerHandler serverHandler, StatusMachineEvent event)
{
try
{
- try
- {
// Acquire lock on domain (see more details in comment of start() method
// of ServerHandler)
lock();
- } catch (InterruptedException ex)
+ }
+ catch (InterruptedException ex)
{
// We have been interrupted for dying, from stopStatusAnalyzer
// to prevent deadlock in this situation:
@@ -2299,43 +2313,50 @@
// waiting for analyzer thread death, a deadlock occurs. So we force
// interruption of the status analyzer thread death after 2 seconds if
// it has not finished (see StatusAnalyzer.waitForShutdown). This allows
- // to have the analyzer thread taking the domain lock only when the status
- // of a DS has to be changed. See more comments in run method of
+ // to have the analyzer thread taking the domain lock only when the
+ // status of a DS has to be changed. See more comments in run method of
// StatusAnalyzer.
if (debugEnabled())
- TRACER.debugInfo(
- "Status analyzer for domain " + baseDn + " has been interrupted when"
- + " trying to acquire domain lock for changing the status of DS " +
- serverHandler.getServerId());
+ TRACER
+ .debugInfo("Status analyzer for domain "
+ + baseDn
+ + " has been interrupted when"
+ + " trying to acquire domain lock for changing the status"
+ + " of DS "
+ + serverHandler.getServerId());
return true;
}
- ServerStatus newStatus = ServerStatus.INVALID_STATUS;
- ServerStatus oldStatus = serverHandler.getStatus();
try
{
- newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
- } catch (IOException e)
- {
- logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER.get(baseDn.
- toString(),
- Integer.toString(serverHandler.getServerId()),
- e.getMessage()));
- }
+ ServerStatus newStatus = ServerStatus.INVALID_STATUS;
+ ServerStatus oldStatus = serverHandler.getStatus();
+ try
+ {
+ newStatus = serverHandler
+ .changeStatusFromStatusAnalyzer(event);
+ }
+ catch (IOException e)
+ {
+ logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
+ .get(baseDn.toString(),
+ Integer.toString(serverHandler.getServerId()),
+ e.getMessage()));
+ }
- if ( (newStatus == ServerStatus.INVALID_STATUS) ||
- (newStatus == oldStatus) )
- {
- // Change was impossible or already occurred (see StatusAnalyzer comments)
- return false;
- }
+ if ((newStatus == ServerStatus.INVALID_STATUS)
+ || (newStatus == oldStatus))
+ {
+ // Change was impossible or already occurred (see StatusAnalyzer
+ // comments)
+ return false;
+ }
- // Update every peers (RS/DS) with topology changes
- buildAndSendTopoInfoToDSs(serverHandler);
- buildAndSendTopoInfoToRSs();
-
+ // Update every peers (RS/DS) with topology changes
+ buildAndSendTopoInfoToDSs(serverHandler);
+ buildAndSendTopoInfoToRSs();
}
- catch(Exception e)
+ catch (Exception e)
{
logError(Message.raw(Category.SYNC, Severity.NOTICE,
stackTraceToSingleLineString(e)));
@@ -2344,6 +2365,7 @@
{
release();
}
+
return false;
}
@@ -2456,11 +2478,14 @@
{
// Acquire lock on domain (see more details in comment of start() method
// of ServerHandler)
- if (!hasLock())
- lock();
- } catch (InterruptedException ex)
+ lock();
+ }
+ catch (InterruptedException ex)
{
- // Try doing job anyway...
+ // We can't deal with this here, so re-interrupt thread so that it is
+ // caught during subsequent IO.
+ Thread.currentThread().interrupt();
+ return;
}
try
@@ -2502,7 +2527,6 @@
* DS we have.
*/
buildAndSendTopoInfoToDSs(null);
-
}
catch(Exception e)
{
@@ -2867,7 +2891,7 @@
* - when creating and sending a TopologyMsg
* - when a DS status is changing (ChangeStatusMsg received or sent)...
*/
- private ReentrantLock lock = new ReentrantLock();
+ private final ReentrantLock lock = new ReentrantLock();
/**
* This lock is used to protect the generationid variable.
@@ -3344,9 +3368,13 @@
// Acquire lock on domain (see more details in comment of start() method
// of ServerHandler)
lock();
- } catch (InterruptedException ex)
+ }
+ catch (InterruptedException ex)
{
- // Try doing job anyway...
+ // We can't deal with this here, so re-interrupt thread so that it is
+ // caught during subsequent IO.
+ Thread.currentThread().interrupt();
+ return;
}
try
@@ -3356,7 +3384,8 @@
{
// If we are the first replication server warned,
// then forwards the message to the remote replication servers
- for (ReplicationServerHandler rsHandler : replicationServers.values())
+ for (ReplicationServerHandler rsHandler : replicationServers
+ .values())
{
try
{
@@ -3365,12 +3394,14 @@
{
rsHandler.send(msg);
}
- } catch (IOException e)
+ }
+ catch (IOException e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
- "Replication Server " + replicationServer.getReplicationPort() +
- " " + baseDn + " " + replicationServer.getServerId()));
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG
+ .get("Replication Server "
+ + replicationServer.getReplicationPort() + " "
+ + baseDn + " " + replicationServer.getServerId()));
stopServer(rsHandler, false);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 3c35331..9b0ac2b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -178,7 +178,6 @@
try
{
- //
lockDomain(false); // no timeout
// Send start
@@ -260,15 +259,11 @@
logError(message);
super.finalizeStart();
-
}
catch(IOException ioe)
{
// FIXME receive
}
- // catch(DirectoryException de)
- //{ already logged
- //
catch(Exception e)
{
// FIXME more detailed exceptions
@@ -528,7 +523,7 @@
msg.getClass().getCanonicalName(),
"TopologyMsg");
}
- abortStart(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
}
// Remote RS sent his topo msg
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index b00939a..c3e2b86 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -832,13 +832,20 @@
return (!this.isDataServer());
}
+
+
/**
* Lock the domain potentially with a timeout.
- * @param timedout The provided timeout.
- * @throws DirectoryException When an exception occurs.
+ *
+ * @param timedout
+ * The provided timeout.
+ * @throws DirectoryException
+ * When an exception occurs.
+ * @throws InterruptedException
+ * If the current thread was interrupted while waiting for the lock.
*/
protected void lockDomain(boolean timedout)
- throws DirectoryException
+ throws DirectoryException, InterruptedException
{
// The handshake phase must be done by blocking any access to structures
// keeping info on connected servers, so that one can safely check for
@@ -859,73 +866,64 @@
// If domain already exists, lock it until handshake is finished otherwise
// it will be created and locked later in the method
- try
+ if (!timedout)
{
- if (!timedout)
- {
- // !timedout
- if (!replicationServerDomain.hasLock())
- replicationServerDomain.lock();
- }
- else
- {
- // timedout
- /**
- * Take the lock on the domain.
- * WARNING: Here we try to acquire the lock with a timeout. This
- * is for preventing a deadlock that may happen if there are cross
- * connection attempts (for same domain) from this replication
- * server and from a peer one:
- * Here is the scenario:
- * - RS1 connect thread takes the domain lock and starts
- * connection to RS2
- * - at the same time RS2 connect thread takes his domain lock and
- * start connection to RS2
- * - RS2 listen thread starts processing received
- * ReplServerStartMsg from RS1 and wants to acquire the lock on
- * the domain (here) but cannot as RS2 connect thread already has
- * it
- * - RS1 listen thread starts processing received
- * ReplServerStartMsg from RS2 and wants to acquire the lock on
- * the domain (here) but cannot as RS1 connect thread already has
- * it
- * => Deadlock: 4 threads are locked.
- * So to prevent that in such situation, the listen threads here
- * will both timeout trying to acquire the lock. The random time
- * for the timeout should allow on connection attempt to be
- * aborted whereas the other one should have time to finish in the
- * same time.
- * Warning: the minimum time (3s) should be big enough to allow
- * normal situation connections to terminate. The added random
- * time should represent a big enough range so that the chance to
- * have one listen thread timing out a lot before the peer one is
- * great. When the first listen thread times out, the remote
- * connect thread should release the lock and allow the peer
- * listen thread to take the lock it was waiting for and process
- * the connection attempt.
- */
- Random random = new Random();
- int randomTime = random.nextInt(6); // Random from 0 to 5
- // Wait at least 3 seconds + (0 to 5 seconds)
- long timeout = (long) (3000 + ( randomTime * 1000 ) );
- boolean noTimeout = replicationServerDomain.tryLock(timeout);
- if (!noTimeout)
- {
- // Timeout
- Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
- getServiceId(),
- serverId,
- session.getReadableRemoteAddress(),
- replicationServerId);
- throw new DirectoryException(ResultCode.OTHER, message);
- }
- }
+ // !timedout
+ if (!replicationServerDomain.hasLock())
+ replicationServerDomain.lock();
}
- catch (InterruptedException e)
+ else
{
- // Thread interrupted
- Message message = ERR_EXCEPTION_LOCKING_RS_DOMAIN.get(e.getMessage());
- logError(message);
+ // timedout
+ /**
+ * Take the lock on the domain.
+ * WARNING: Here we try to acquire the lock with a timeout. This
+ * is for preventing a deadlock that may happen if there are cross
+ * connection attempts (for same domain) from this replication
+ * server and from a peer one:
+ * Here is the scenario:
+ * - RS1 connect thread takes the domain lock and starts
+ * connection to RS2
+ * - at the same time RS2 connect thread takes his domain lock and
+ * start connection to RS2
+ * - RS2 listen thread starts processing received
+ * ReplServerStartMsg from RS1 and wants to acquire the lock on
+ * the domain (here) but cannot as RS2 connect thread already has
+ * it
+ * - RS1 listen thread starts processing received
+ * ReplServerStartMsg from RS2 and wants to acquire the lock on
+ * the domain (here) but cannot as RS1 connect thread already has
+ * it
+ * => Deadlock: 4 threads are locked.
+ * So to prevent that in such situation, the listen threads here
+ * will both timeout trying to acquire the lock. The random time
+ * for the timeout should allow on connection attempt to be
+ * aborted whereas the other one should have time to finish in the
+ * same time.
+ * Warning: the minimum time (3s) should be big enough to allow
+ * normal situation connections to terminate. The added random
+ * time should represent a big enough range so that the chance to
+ * have one listen thread timing out a lot before the peer one is
+ * great. When the first listen thread times out, the remote
+ * connect thread should release the lock and allow the peer
+ * listen thread to take the lock it was waiting for and process
+ * the connection attempt.
+ */
+ Random random = new Random();
+ int randomTime = random.nextInt(6); // Random from 0 to 5
+ // Wait at least 3 seconds + (0 to 5 seconds)
+ long timeout = (long) (3000 + ( randomTime * 1000 ) );
+ boolean noTimeout = replicationServerDomain.tryLock(timeout);
+ if (!noTimeout)
+ {
+ // Timeout
+ Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+ getServiceId(),
+ serverId,
+ session.getReadableRemoteAddress(),
+ replicationServerId);
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 111518d..05d97d0 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -155,7 +155,7 @@
StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
if (interrupted)
{
- // Finih job and let thread die
+ // Finish job and let thread die
TRACER.debugInfo("Status analyzer for dn " +
replicationServerDomain.getBaseDn().toString() +
" has been interrupted and will die. This is in RS " +
@@ -173,7 +173,7 @@
StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
if (interrupted)
{
- // Finih job and let thread die
+ // Finish job and let thread die
TRACER.debugInfo("Status analyzer for dn " +
replicationServerDomain.getBaseDn().toString() +
" has been interrupted and will die. This is in RS " +
--
Gitblit v1.10.0