From d2db6915a220002a55281ebeb94fc8c590a33853 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 14 Apr 2011 16:45:52 +0000
Subject: [PATCH] Another fix for issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 216 +++++++----------------------------------------------
1 files changed, 31 insertions(+), 185 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1e8de0f..9559252 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -67,24 +67,7 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.MonitorMsg;
-import org.opends.server.replication.protocol.MonitorRequestMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplServerStartDSMsg;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ServerStartECLMsg;
-import org.opends.server.replication.protocol.ServerStartMsg;
-import org.opends.server.replication.protocol.StartECLSessionMsg;
-import org.opends.server.replication.protocol.StartSessionMsg;
-import org.opends.server.replication.protocol.StopMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.protocol.WindowMsg;
-import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.replication.protocol.*;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
@@ -780,7 +763,7 @@
{
// Connect to server and get info about it
ReplicationServerInfo replicationServerInfo =
- performPhaseOneHandshake(server, false);
+ performPhaseOneHandshake(server, false, false);
// Store server info in list
if (replicationServerInfo != null)
@@ -808,11 +791,10 @@
// FIXME:ECL List of RS to connect is for now limited to one RS only
String bestServer = this.servers.iterator().next();
- ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
- bestServer, true);
-
- if (inReplServerStartDSMsg != null)
+ if (performPhaseOneHandshake(bestServer, true, true) != null)
+ {
performECLPhaseTwoHandshake(bestServer);
+ }
}
/**
@@ -887,7 +869,7 @@
" phase 2 : will perform PhaseOneH with the preferred RS="
+ replicationServerInfo);
replicationServerInfo = performPhaseOneHandshake(
- replicationServerInfo.getServerURL(), true);
+ replicationServerInfo.getServerURL(), true, false);
if (replicationServerInfo != null)
{
@@ -1170,10 +1152,12 @@
* Do we keep session opened or not after handshake. Use true if want
* to perform handshake phase 2 with the same session and keep the
* session to create as the current one.
+ * @param isECL
+ * Indicates whether or not the an ECL handshake is to be performed.
* @return The answer from the server . Null if could not get an answer.
*/
private ReplicationServerInfo performPhaseOneHandshake(
- String server, boolean keepConnection)
+ String server, boolean keepConnection, boolean isECL)
{
int separator = server.lastIndexOf(':');
String port = server.substring(separator + 1);
@@ -1202,10 +1186,21 @@
.isSslEncryption(server);
// Send our ServerStartMsg.
- ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
- baseDn, maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(),
- this.getGenerationID(), isSslEncryption, groupId);
+ StartMsg serverStartMsg;
+ if (!isECL)
+ {
+ serverStartMsg = new ServerStartMsg(serverId, baseDn,
+ maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(),
+ this.getGenerationID(), isSslEncryption, groupId);
+ }
+ else
+ {
+ serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0,
+ maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(),
+ this.getGenerationID(), isSslEncryption, groupId);
+ }
localSession.publish(serverStartMsg);
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1236,9 +1231,13 @@
* replication server will use the same one (or an older one if it is an
* old replication server).
*/
- protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo
- .getProtocolVersion());
- localSession.setProtocolVersion(protocolVersion);
+ final short localProtocolVersion = ProtocolVersion
+ .minWithCurrent(replServerInfo.getProtocolVersion());
+ if (keepConnection)
+ {
+ protocolVersion = localProtocolVersion;
+ }
+ localSession.setProtocolVersion(localProtocolVersion);
if (!isSslEncryption)
{
@@ -1318,160 +1317,7 @@
}
}
- /**
- * Connect to the provided server performing the first phase handshake
- * (start messages exchange) and return the reply message from the replication
- * server.
- *
- * @param server Server to connect to.
- * @param keepConnection Do we keep session opened or not after handshake.
- * Use true if want to perform handshake phase 2 with the same session
- * and keep the session to create as the current one.
- * @return The ReplServerStartDSMsg the server replied. Null if could not
- * get an answer.
- */
- private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
- boolean keepConnection)
- {
- // FIXME: this should be merged with performPhaseOneHandshake to avoid
- // code/bug duplication.
- ReplServerStartDSMsg replServerStartDSMsg = null;
- // Parse server string.
- int separator = server.lastIndexOf(':');
- String port = server.substring(separator + 1);
- String hostname = server.substring(0, separator);
- ProtocolSession localSession = null;
-
- boolean error = false;
- try
- {
- /*
- * Open a socket connection to the next candidate.
- */
- int intPort = Integer.parseInt(port);
- InetSocketAddress serverAddr = new InetSocketAddress(
- InetAddress.getByName(hostname), intPort);
- Socket socket = new Socket();
- socket.setReceiveBufferSize(1000000);
- socket.setTcpNoDelay(true);
- socket.connect(serverAddr, 500);
- localSession = replSessionSecurity.createClientSession(socket,
- ReplSessionSecurity.HANDSHAKE_TIMEOUT);
- boolean isSslEncryption =
- replSessionSecurity.isSslEncryption(server);
-
- // Send our start msg.
- ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
- baseDn, 0, 0, 0, 0,
- maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
- isSslEncryption,
- groupId);
- localSession.publish(serverStartECLMsg);
-
- // Read the ReplServerStartMsg that should come back.
- replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
-
- if (debugEnabled())
- {
- debugInfo("In RB for " + baseDn +
- "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
- "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
- }
-
- // Sanity check
- String repDn = replServerStartDSMsg.getBaseDn();
- if (!(this.baseDn.equals(repDn)))
- {
- Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
- this.baseDn);
- logError(message);
- error = true;
- }
-
- /*
- * We have sent our own protocol version to the replication server.
- * The replication server will use the same one (or an older one
- * if it is an old replication server).
- */
- if (keepConnection)
- protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartDSMsg.getVersion());
- localSession.setProtocolVersion(protocolVersion);
-
- if (!isSslEncryption)
- {
- localSession.stopEncryption();
- }
- } catch (ConnectException e)
- {
- /*
- * There was no server waiting on this host:port
- * Log a notice and try the next replicationServer in the list
- */
- if (!connectionError)
- {
- Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
- server, baseDn);
-
- if (keepConnection) // Log error message only for final connection
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- logError(message);
- } else if (debugEnabled())
- {
- debugInfo(message.toString());
- }
- }
- error = true;
- } catch (Exception e)
- {
- if ((e instanceof SocketTimeoutException) && debugEnabled())
- {
- debugInfo("Timeout trying to connect to RS " + server +
- " for dn: " + baseDn);
- }
- Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
- server, baseDn, stackTraceToSingleLineString(e));
- if (keepConnection) // Log error message only for final connection
- {
- logError(message);
- } else if (debugEnabled())
- {
- debugInfo(message.toString());
- }
- error = true;
- }
-
- // Close session if requested
- if (!keepConnection || error)
- {
- if (localSession != null)
- {
- if (debugEnabled()) {
- debugInfo("In RB, closing session after phase 1");
- }
- localSession.close();
- localSession = null;
- }
- if (error)
- {
- replServerStartDSMsg = null;
- } // Be sure to return null.
-
- }
-
- // If this connection as the one to use for sending and receiving updates,
- // store it.
- if (keepConnection)
- {
- session = localSession;
- }
-
- return replServerStartDSMsg;
- }
/**
* Performs the second phase handshake (send StartSessionMsg and receive
--
Gitblit v1.10.0