From c7077670daca3b689ed75e4bf71dad0483af8473 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 13:27:40 +0000
Subject: [PATCH] Avoided possible costly thread leaks in ReplicationServerDomain.
---
opends/src/server/org/opends/server/replication/server/ServerReader.java | 126 +++++++++++++++---------------------------
1 files changed, 45 insertions(+), 81 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 82dc898..2a33972 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -27,11 +27,6 @@
*/
package org.opends.server.replication.server;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
import java.io.IOException;
import org.opends.messages.Message;
@@ -40,6 +35,12 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.util.StaticUtils.*;
+
/**
* This class implement the part of the replicationServer that is reading
* the connection from the LDAP servers to get all the updates that
@@ -74,7 +75,7 @@
public ServerReader(Session session, ServerHandler handler)
{
super("Replication server RS(" + handler.getReplicationServerId()
- + ") reading from " + handler.toString() + " at "
+ + ") reading from " + handler + " at "
+ session.getReadableRemoteAddress());
this.session = session;
this.handler = handler;
@@ -90,7 +91,7 @@
Message errMessage = null;
if (debugEnabled())
{
- TRACER.debugInfo(this.getName() + " starting");
+ TRACER.debugInfo(getName() + " starting");
}
/*
* wait on input stream
@@ -110,13 +111,14 @@
if (msg instanceof AckMsg)
{
- AckMsg ack = (AckMsg) msg;
handler.checkWindow();
- handler.processAck(ack);
+ handler.processAck((AckMsg) msg);
} else if (msg instanceof UpdateMsg)
{
+ UpdateMsg updateMsg = (UpdateMsg) msg;
+
boolean filtered = false;
- /* Ignore updates in some cases */
+ // Ignore updates in some cases
if (handler.isDataServer())
{
/**
@@ -133,22 +135,22 @@
* better performances in normal mode (most of the time).
*/
ServerStatus dsStatus = handler.getStatus();
- if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
- (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+ if (dsStatus == BAD_GEN_ID_STATUS
+ || dsStatus == FULL_UPDATE_STATUS)
{
long referenceGenerationId = handler.getReferenceGenId();
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+ if (dsStatus == BAD_GEN_ID_STATUS)
logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
handler.getReplicationServerId(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
+ updateMsg.getChangeNumber().toString(),
handler.getBaseDN(), handler.getServerId(),
session.getReadableRemoteAddress(),
handler.getGenerationId(),
referenceGenerationId));
- if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+ if (dsStatus == FULL_UPDATE_STATUS)
logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
handler.getReplicationServerId(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
+ updateMsg.getChangeNumber().toString(),
handler.getBaseDN(), handler.getServerId(),
session.getReadableRemoteAddress()));
filtered = true;
@@ -159,14 +161,14 @@
* Ignore updates from RS with bad gen id
* (no system managed status for a RS)
*/
- long referenceGenerationId =handler.getReferenceGenId();
- if ((referenceGenerationId > 0) &&
- (referenceGenerationId != handler.getGenerationId()))
+ long referenceGenerationId = handler.getReferenceGenId();
+ if (referenceGenerationId > 0
+ && referenceGenerationId != handler.getGenerationId())
{
logError(
WARN_IGNORING_UPDATE_FROM_RS.get(
handler.getReplicationServerId(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
+ updateMsg.getChangeNumber().toString(),
handler.getBaseDN(),
handler.getServerId(),
session.getReadableRemoteAddress(),
@@ -178,53 +180,24 @@
if (!filtered)
{
- UpdateMsg update = (UpdateMsg) msg;
- handler.decAndCheckWindow();
- handler.put(update);
+ handler.put(updateMsg);
}
} else if (msg instanceof WindowMsg)
{
- WindowMsg windowMsg = (WindowMsg) msg;
- handler.updateWindow(windowMsg);
- } else if (msg instanceof InitializeRequestMsg)
+ handler.updateWindow((WindowMsg) msg);
+ } else if (msg instanceof RoutableMsg)
{
- InitializeRequestMsg initializeMsg =
- (InitializeRequestMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof InitializeRcvAckMsg)
- {
- InitializeRcvAckMsg initializeRcvAckMsg =
- (InitializeRcvAckMsg) msg;
- handler.process(initializeRcvAckMsg);
- } else if (msg instanceof InitializeTargetMsg)
- {
- InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof EntryMsg)
- {
- EntryMsg entryMsg = (EntryMsg) msg;
- handler.process(entryMsg);
- } else if (msg instanceof DoneMsg)
- {
- DoneMsg doneMsg = (DoneMsg) msg;
- handler.process(doneMsg);
- } else if (msg instanceof ErrorMsg)
- {
- ErrorMsg errorMsg = (ErrorMsg) msg;
- handler.process(errorMsg);
+ handler.process((RoutableMsg) msg);
} else if (msg instanceof ResetGenerationIdMsg)
{
- ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
- handler.processResetGenId(genIdMsg);
+ handler.processResetGenId((ResetGenerationIdMsg) msg);
} else if (msg instanceof WindowProbeMsg)
{
- WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
- handler.process(windowProbeMsg);
+ handler.replyToWindowProbe();
} else if (msg instanceof TopologyMsg)
{
- TopologyMsg topoMsg = (TopologyMsg) msg;
ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
- rsh.receiveTopoInfoFromRS(topoMsg);
+ rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
} else if (msg instanceof ChangeStatusMsg)
{
ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
@@ -242,28 +215,18 @@
csMsg.toString());
logError(errMessage);
}
- } else if (msg instanceof MonitorRequestMsg)
- {
- MonitorRequestMsg replServerMonitorRequestMsg =
- (MonitorRequestMsg) msg;
- handler.process(replServerMonitorRequestMsg);
- } else if (msg instanceof MonitorMsg)
- {
- MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
- handler.process(replServerMonitorMsg);
} else if (msg instanceof ChangeTimeHeartbeatMsg)
{
- ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
- handler.process(cthbMsg);
+ handler.process((ChangeTimeHeartbeatMsg) msg);
} else if (msg instanceof StopMsg)
{
// Peer server is properly disconnecting: go out of here to
// properly close the server handler going to finally block.
if (debugEnabled())
{
- TRACER.debugInfo(handler.toString() + " has properly " +
- "disconnected from this replication server " +
- Integer.toString(handler.getReplicationServerId()));
+ TRACER.debugInfo(handler
+ + " has properly disconnected from this replication server "
+ + handler.getReplicationServerId());
}
return;
} else if (msg == null)
@@ -281,9 +244,7 @@
// Received a V1 PDU we do not need to support:
// we just trash the message and log the event for debug purpose,
// then continue receiving messages.
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+ logException(e);
}
}
}
@@ -294,9 +255,7 @@
* Log a message and exit from this loop
* So that this handler is stopped.
*/
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+ logException(e);
if (!handler.shuttingDown())
{
if (handler.isDataServer())
@@ -316,9 +275,7 @@
}
catch (Exception e)
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+ logException(e);
/*
* The remote server has sent an unknown message,
* close the connection.
@@ -334,14 +291,21 @@
*/
if (debugEnabled())
{
- TRACER.debugInfo("In " + this.getName() + " closing the session");
+ TRACER.debugInfo("In " + getName() + " closing the session");
}
session.close();
handler.doStop();
if (debugEnabled())
{
- TRACER.debugInfo(this.getName() + " stopped " + errMessage);
+ TRACER.debugInfo(getName() + " stopped: " + errMessage);
}
}
}
+
+ private void logException(Exception e)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getName() + " " + stackTraceToSingleLineString(e));
+ }
}
--
Gitblit v1.10.0