From 09e3430331f80ba64b3318c5d315c59ed230af1f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 13:27:40 +0000
Subject: [PATCH] Avoided possible costly thread leaks in ReplicationServerDomain.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java | 33 ++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 31 +++-
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java | 23 +--
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 85 ++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 126 +++++++-------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 6
8 files changed, 134 insertions(+), 182 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
index 91e3679..afddeb9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
@@ -29,17 +29,16 @@
import java.util.zip.DataFormatException;
-
/**
- * This message is used by LDAP or Replication Server that have been
- * out of credit for a while and want to check that the remote servers.
- *
- * A sending entity that is blocked because its send window is closed
- * for a while should create such a message to check that the window
- * closure is valid.
- *
+ * This message is used by LDAP or Replication Server that have been out of
+ * credit for a while and want to check if the remote servers is able to accept
+ * more messages.
+ * <p>
+ * A sending entity that is blocked because its send window is closed for a
+ * while should create such a message to check that the window closure is valid.
+ * <p>
* An entity that received such a message should respond with a
- * WindowUpdate message indicating the curent credit available.
+ * {@link WindowMsg} indicating the current credit available.
*/
public class WindowProbeMsg extends ReplicationMsg
{
@@ -72,10 +71,6 @@
public byte[] getBytes(short protocolVersion)
{
// WindowProbeMsg Message only contains its type.
-
- byte[] resultByteArray = new byte[1];
- resultByteArray[0] = MSG_TYPE_WINDOW_PROBE;
-
- return resultByteArray;
+ return new byte[] { MSG_TYPE_WINDOW_PROBE };
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 4b25380..d956b28 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -542,11 +542,7 @@
if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
- startMsg = new ReplServerStartMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServer.getDegradedStatusThreshold());
+ startMsg = createReplServerStartMsg();
}
else
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index eae646a..2399136 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,11 +365,7 @@
if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
- startMsg = new ReplServerStartMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServer.getDegradedStatusThreshold());
+ startMsg = createReplServerStartMsg();
}
else
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index eb678b0..1a5e7bf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -91,8 +91,7 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Monitoring publisher starting for dn "
- + replicationServerDomain.getBaseDn());
+ TRACER.debugInfo(getMessage("Monitoring publisher starting."));
}
try
@@ -134,16 +133,12 @@
}
catch (InterruptedException e)
{
- TRACER.debugInfo("Monitoring publisher for dn "
- + replicationServerDomain.getBaseDn()
- + " in RS " + replicationServerDomain.getLocalRSServerId()
- + " has been interrupted while sleeping.");
+ TRACER.debugInfo(getMessage(
+ "Monitoring publisher has been interrupted while sleeping."));
}
done = true;
- TRACER.debugInfo("Monitoring publisher for dn "
- + replicationServerDomain.getBaseDn() + " is terminated."
- + " This is in RS " + replicationServerDomain.getLocalRSServerId());
+ TRACER.debugInfo(getMessage("Monitoring publisher is terminated."));
}
@@ -160,9 +155,7 @@
if (debugEnabled())
{
- TRACER.debugInfo("Shutting down monitoring publisher for dn "
- + replicationServerDomain.getBaseDn()
- + " in RS " + replicationServerDomain.getLocalRSServerId());
+ TRACER.debugInfo(getMessage("Shutting down monitoring publisher."));
}
}
}
@@ -183,9 +176,7 @@
n++;
if (n >= FACTOR)
{
- TRACER.debugInfo("Interrupting monitoring publisher for dn " +
- replicationServerDomain.getBaseDn() + " in RS " +
- replicationServerDomain.getLocalRSServerId());
+ TRACER.debugInfo(getMessage("Interrupting monitoring publisher."));
interrupt();
}
}
@@ -203,11 +194,17 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Monitoring publisher for dn " +
- replicationServerDomain.getBaseDn() +
- " changing period value to " + period);
+ TRACER.debugInfo(getMessage(
+ "Monitoring publisher changing period value to " + period));
}
this.period = period;
}
+
+ private String getMessage(String message)
+ {
+ return "In RS " + replicationServerDomain.getLocalRSServerId()
+ + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
+ + message;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 099f608..5b8859a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Category;
@@ -75,17 +76,22 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
private final String baseDn;
+
/**
* The Status analyzer that periodically verifies whether the connected DSs
- * are late.
+ * are late. Using an AtomicReference to avoid leaking references to costly
+ * threads.
*/
- private StatusAnalyzer statusAnalyzer = null;
+ private AtomicReference<StatusAnalyzer> statusAnalyzer =
+ new AtomicReference<StatusAnalyzer>();
/**
* The monitoring publisher that periodically sends monitoring messages to the
- * topology.
+ * topology. Using an AtomicReference to avoid leaking references to costly
+ * threads.
*/
- private MonitoringPublisher monitoringPublisher = null;
+ private AtomicReference<MonitoringPublisher> monitoringPublisher =
+ new AtomicReference<MonitoringPublisher>();
/**
* The following map contains one balanced tree for each replica ID to which
@@ -2853,14 +2859,15 @@
*/
public void startStatusAnalyzer()
{
- if (!isRunningStatusAnalyzer())
- {
- int degradedStatusThreshold =
+ int degradedStatusThreshold =
localReplicationServer.getDegradedStatusThreshold();
- if (degradedStatusThreshold > 0) // 0 means no status analyzer
+ if (degradedStatusThreshold > 0) // 0 means no status analyzer
+ {
+ final StatusAnalyzer thread =
+ new StatusAnalyzer(this, degradedStatusThreshold);
+ if (statusAnalyzer.compareAndSet(null, thread))
{
- statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
- statusAnalyzer.start();
+ thread.start();
}
}
}
@@ -2870,35 +2877,26 @@
*/
private void stopStatusAnalyzer()
{
- if (isRunningStatusAnalyzer())
+ final StatusAnalyzer thread = statusAnalyzer.get();
+ if (statusAnalyzer.compareAndSet(thread, null))
{
- statusAnalyzer.shutdown();
- statusAnalyzer.waitForShutdown();
- statusAnalyzer = null;
+ thread.shutdown();
+ thread.waitForShutdown();
}
}
/**
- * Tests if the status analyzer for this domain is running.
- * @return True if the status analyzer is running, false otherwise.
- */
- private boolean isRunningStatusAnalyzer()
- {
- return statusAnalyzer != null;
- }
-
- /**
* Starts the monitoring publisher for the domain if not already started.
*/
public void startMonitoringPublisher()
{
- if (!isRunningMonitoringPublisher())
+ long period = localReplicationServer.getMonitoringPublisherPeriod();
+ if (period > 0) // 0 means no monitoring publisher
{
- long period = localReplicationServer.getMonitoringPublisherPeriod();
- if (period > 0) // 0 means no monitoring publisher
+ final MonitoringPublisher thread = new MonitoringPublisher(this, period);
+ if (monitoringPublisher.compareAndSet(null, thread))
{
- monitoringPublisher = new MonitoringPublisher(this, period);
- monitoringPublisher.start();
+ thread.start();
}
}
}
@@ -2908,24 +2906,15 @@
*/
private void stopMonitoringPublisher()
{
- if (isRunningMonitoringPublisher())
+ final MonitoringPublisher thread = monitoringPublisher.get();
+ if (monitoringPublisher.compareAndSet(thread, null))
{
- monitoringPublisher.shutdown();
- monitoringPublisher.waitForShutdown();
- monitoringPublisher = null;
+ thread.shutdown();
+ thread.waitForShutdown();
}
}
/**
- * Tests if the monitoring publisher for this domain is running.
- * @return True if the monitoring publisher is running, false otherwise.
- */
- private boolean isRunningMonitoringPublisher()
- {
- return monitoringPublisher != null;
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -3360,10 +3349,13 @@
{
// Requested to stop analyzers
stopStatusAnalyzer();
+ return;
}
- else if (isRunningStatusAnalyzer())
+
+ final StatusAnalyzer saThread = statusAnalyzer.get();
+ if (saThread != null) // it is running
{
- statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
+ saThread.setDegradedStatusThreshold(degradedStatusThreshold);
}
else if (getConnectedDSs().size() > 0)
{
@@ -3384,10 +3376,13 @@
{
// Requested to stop monitoring publishers
stopMonitoringPublisher();
+ return;
}
- else if (isRunningMonitoringPublisher())
+
+ final MonitoringPublisher mpThread = monitoringPublisher.get();
+ if (mpThread != null) // it is running
{
- monitoringPublisher.setPeriod(period);
+ mpThread.setPeriod(period);
}
else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 1753dd6..94334d1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -113,11 +113,7 @@
*/
private ReplServerStartMsg sendStartToRemote() throws IOException
{
- ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg(
- getReplicationServerId(), getReplicationServerURL(), getBaseDN(),
- maxRcvWindow, replicationServerDomain.getDbServerState(),
- localGenerationId, sslEncryption,
- getLocalGroupId(), replicationServer.getDegradedStatusThreshold());
+ ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg();
send(outReplServerStartMsg);
return outReplServerStartMsg;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 339ce80..18a0593 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -155,12 +155,12 @@
// window
private int rcvWindow;
- private int rcvWindowSizeHalf;
+ private final int rcvWindowSizeHalf;
/**
* The size of the receiving window.
*/
- protected int maxRcvWindow;
+ protected final int maxRcvWindow;
/**
* Semaphore that the writer uses to control the flow to the remote server.
*/
@@ -288,7 +288,7 @@
*
* @throws IOException when the session becomes unavailable.
*/
- public synchronized void decAndCheckWindow() throws IOException
+ private synchronized void decAndCheckWindow() throws IOException
{
rcvWindow--;
checkWindow();
@@ -318,8 +318,7 @@
* and monitoring system.
* @throws DirectoryException When an exception is raised.
*/
- protected void finalizeStart()
- throws DirectoryException
+ protected void finalizeStart() throws DirectoryException
{
// FIXME:ECL We should refactor so that a SH always have a session
if (session != null)
@@ -906,11 +905,10 @@
/**
* Process the reception of a WindowProbeMsg message.
*
- * @param windowProbeMsg The message to process.
- *
- * @throws IOException When the session becomes unavailable.
+ * @throws IOException
+ * When the session becomes unavailable.
*/
- public void process(WindowProbeMsg windowProbeMsg) throws IOException
+ public void replyToWindowProbe() throws IOException
{
if (rcvWindow > 0)
{
@@ -1250,6 +1248,7 @@
*/
public void put(UpdateMsg update) throws IOException
{
+ decAndCheckWindow();
if (replicationServerDomain!=null)
replicationServerDomain.put(update, this);
}
@@ -1262,4 +1261,18 @@
if (replicationServerDomain!=null)
replicationServerDomain.stopServer(this, false);
}
+
+ /**
+ * Creates a ReplServerStartMsg for the current ServerHandler.
+ *
+ * @return a new ReplServerStartMsg for the current ServerHandler.
+ */
+ protected ReplServerStartMsg createReplServerStartMsg()
+ {
+ return new ReplServerStartMsg(getReplicationServerId(),
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ replicationServerDomain.getDbServerState(), localGenerationId,
+ sslEncryption, getLocalGroupId(),
+ replicationServer.getDegradedStatusThreshold());
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 82dc898..2a33972 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -27,11 +27,6 @@
*/
package org.opends.server.replication.server;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
import java.io.IOException;
import org.opends.messages.Message;
@@ -40,6 +35,12 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.util.StaticUtils.*;
+
/**
* This class implement the part of the replicationServer that is reading
* the connection from the LDAP servers to get all the updates that
@@ -74,7 +75,7 @@
public ServerReader(Session session, ServerHandler handler)
{
super("Replication server RS(" + handler.getReplicationServerId()
- + ") reading from " + handler.toString() + " at "
+ + ") reading from " + handler + " at "
+ session.getReadableRemoteAddress());
this.session = session;
this.handler = handler;
@@ -90,7 +91,7 @@
Message errMessage = null;
if (debugEnabled())
{
- TRACER.debugInfo(this.getName() + " starting");
+ TRACER.debugInfo(getName() + " starting");
}
/*
* wait on input stream
@@ -110,13 +111,14 @@
if (msg instanceof AckMsg)
{
- AckMsg ack = (AckMsg) msg;
handler.checkWindow();
- handler.processAck(ack);
+ handler.processAck((AckMsg) msg);
} else if (msg instanceof UpdateMsg)
{
+ UpdateMsg updateMsg = (UpdateMsg) msg;
+
boolean filtered = false;
- /* Ignore updates in some cases */
+ // Ignore updates in some cases
if (handler.isDataServer())
{
/**
@@ -133,22 +135,22 @@
* better performances in normal mode (most of the time).
*/
ServerStatus dsStatus = handler.getStatus();
- if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
- (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+ if (dsStatus == BAD_GEN_ID_STATUS
+ || dsStatus == FULL_UPDATE_STATUS)
{
long referenceGenerationId = handler.getReferenceGenId();
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+ if (dsStatus == BAD_GEN_ID_STATUS)
logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
handler.getReplicationServerId(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
+ updateMsg.getChangeNumber().toString(),
handler.getBaseDN(), handler.getServerId(),
session.getReadableRemoteAddress(),
handler.getGenerationId(),
referenceGenerationId));
- if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+ if (dsStatus == FULL_UPDATE_STATUS)
logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
handler.getReplicationServerId(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
+ updateMsg.getChangeNumber().toString(),
handler.getBaseDN(), handler.getServerId(),
session.getReadableRemoteAddress()));
filtered = true;
@@ -159,14 +161,14 @@
* Ignore updates from RS with bad gen id
* (no system managed status for a RS)
*/
- long referenceGenerationId =handler.getReferenceGenId();
- if ((referenceGenerationId > 0) &&
- (referenceGenerationId != handler.getGenerationId()))
+ long referenceGenerationId = handler.getReferenceGenId();
+ if (referenceGenerationId > 0
+ && referenceGenerationId != handler.getGenerationId())
{
logError(
WARN_IGNORING_UPDATE_FROM_RS.get(
handler.getReplicationServerId(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
+ updateMsg.getChangeNumber().toString(),
handler.getBaseDN(),
handler.getServerId(),
session.getReadableRemoteAddress(),
@@ -178,53 +180,24 @@
if (!filtered)
{
- UpdateMsg update = (UpdateMsg) msg;
- handler.decAndCheckWindow();
- handler.put(update);
+ handler.put(updateMsg);
}
} else if (msg instanceof WindowMsg)
{
- WindowMsg windowMsg = (WindowMsg) msg;
- handler.updateWindow(windowMsg);
- } else if (msg instanceof InitializeRequestMsg)
+ handler.updateWindow((WindowMsg) msg);
+ } else if (msg instanceof RoutableMsg)
{
- InitializeRequestMsg initializeMsg =
- (InitializeRequestMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof InitializeRcvAckMsg)
- {
- InitializeRcvAckMsg initializeRcvAckMsg =
- (InitializeRcvAckMsg) msg;
- handler.process(initializeRcvAckMsg);
- } else if (msg instanceof InitializeTargetMsg)
- {
- InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof EntryMsg)
- {
- EntryMsg entryMsg = (EntryMsg) msg;
- handler.process(entryMsg);
- } else if (msg instanceof DoneMsg)
- {
- DoneMsg doneMsg = (DoneMsg) msg;
- handler.process(doneMsg);
- } else if (msg instanceof ErrorMsg)
- {
- ErrorMsg errorMsg = (ErrorMsg) msg;
- handler.process(errorMsg);
+ handler.process((RoutableMsg) msg);
} else if (msg instanceof ResetGenerationIdMsg)
{
- ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
- handler.processResetGenId(genIdMsg);
+ handler.processResetGenId((ResetGenerationIdMsg) msg);
} else if (msg instanceof WindowProbeMsg)
{
- WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
- handler.process(windowProbeMsg);
+ handler.replyToWindowProbe();
} else if (msg instanceof TopologyMsg)
{
- TopologyMsg topoMsg = (TopologyMsg) msg;
ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
- rsh.receiveTopoInfoFromRS(topoMsg);
+ rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
} else if (msg instanceof ChangeStatusMsg)
{
ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
@@ -242,28 +215,18 @@
csMsg.toString());
logError(errMessage);
}
- } else if (msg instanceof MonitorRequestMsg)
- {
- MonitorRequestMsg replServerMonitorRequestMsg =
- (MonitorRequestMsg) msg;
- handler.process(replServerMonitorRequestMsg);
- } else if (msg instanceof MonitorMsg)
- {
- MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
- handler.process(replServerMonitorMsg);
} else if (msg instanceof ChangeTimeHeartbeatMsg)
{
- ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
- handler.process(cthbMsg);
+ handler.process((ChangeTimeHeartbeatMsg) msg);
} else if (msg instanceof StopMsg)
{
// Peer server is properly disconnecting: go out of here to
// properly close the server handler going to finally block.
if (debugEnabled())
{
- TRACER.debugInfo(handler.toString() + " has properly " +
- "disconnected from this replication server " +
- Integer.toString(handler.getReplicationServerId()));
+ TRACER.debugInfo(handler
+ + " has properly disconnected from this replication server "
+ + handler.getReplicationServerId());
}
return;
} else if (msg == null)
@@ -281,9 +244,7 @@
// Received a V1 PDU we do not need to support:
// we just trash the message and log the event for debug purpose,
// then continue receiving messages.
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+ logException(e);
}
}
}
@@ -294,9 +255,7 @@
* Log a message and exit from this loop
* So that this handler is stopped.
*/
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+ logException(e);
if (!handler.shuttingDown())
{
if (handler.isDataServer())
@@ -316,9 +275,7 @@
}
catch (Exception e)
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+ logException(e);
/*
* The remote server has sent an unknown message,
* close the connection.
@@ -334,14 +291,21 @@
*/
if (debugEnabled())
{
- TRACER.debugInfo("In " + this.getName() + " closing the session");
+ TRACER.debugInfo("In " + getName() + " closing the session");
}
session.close();
handler.doStop();
if (debugEnabled())
{
- TRACER.debugInfo(this.getName() + " stopped " + errMessage);
+ TRACER.debugInfo(getName() + " stopped: " + errMessage);
}
}
}
+
+ private void logException(Exception e)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getName() + " " + stackTraceToSingleLineString(e));
+ }
}
--
Gitblit v1.10.0