From 96eaa516a85e620a6b76a64ffbe71cdc6037e026 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 13 Apr 2011 16:23:40 +0000
Subject: [PATCH] Initial fix for OpenDJ-97: Very many minor problems with the error logging for replication
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 326 +++++++++++++++++++++++++++--------------------------
1 files changed, 166 insertions(+), 160 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 22148e5..347158c 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.collectionToString;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -67,7 +68,6 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -871,7 +871,6 @@
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
- String rsis = replicationServerInfos.toString();
ReplicationServerInfo replicationServerInfo = null;
@@ -1030,22 +1029,18 @@
this.getGenerationID()) ||
(replicationServerInfo.getGenerationId() == -1))
{
- Message message =
- NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
- baseDn.toString(),
- Integer.toString(rsServerId),
- replicationServer,
- Integer.toString(serverId),
- Long.toString(this.getGenerationID()));
+ Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
+ .get(serverId, rsServerId, baseDn,
+ session.getReadableRemoteAddress(),
+ getGenerationID());
logError(message);
} else
{
- Message message =
- NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
- baseDn.toString(),
- replicationServer,
- Long.toString(this.getGenerationID()),
- Long.toString(replicationServerInfo.getGenerationId()));
+ Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
+ .get(serverId, rsServerId, baseDn,
+ session.getReadableRemoteAddress(),
+ getGenerationID(),
+ replicationServerInfo.getGenerationId());
logError(message);
}
} else
@@ -1058,9 +1053,22 @@
{
connectionError = true;
connectPhaseLock.notify();
- Message message =
- NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis);
- logError(message);
+
+ if (replicationServerInfos.size() > 0)
+ {
+ Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
+ serverId,
+ baseDn,
+ collectionToString(replicationServerInfos.keySet(),
+ ", "));
+ logError(message);
+ }
+ else
+ {
+ Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
+ serverId, baseDn);
+ logError(message);
+ }
}
}
}
@@ -1149,30 +1157,33 @@
}
}
+
+
/**
- * Connect to the provided server performing the first phase handshake
- * (start messages exchange) and return the reply message from the replication
+ * Connect to the provided server performing the first phase handshake (start
+ * messages exchange) and return the reply message from the replication
* server, wrapped in a ReplicationServerInfo object.
*
- * @param server Server to connect to.
- * @param keepConnection Do we keep session opened or not after handshake.
- * Use true if want to perform handshake phase 2 with the same session
- * and keep the session to create as the current one.
- * @return The answer from the server . Null if could not
- * get an answer.
+ * @param server
+ * Server to connect to.
+ * @param keepConnection
+ * Do we keep session opened or not after handshake. Use true if want
+ * to perform handshake phase 2 with the same session and keep the
+ * session to create as the current one.
+ * @return The answer from the server . Null if could not get an answer.
*/
- private ReplicationServerInfo performPhaseOneHandshake(String server,
- boolean keepConnection)
+ private ReplicationServerInfo performPhaseOneHandshake(
+ String server, boolean keepConnection)
{
- ReplicationServerInfo replServerInfo = null;
-
- // Parse server string.
int separator = server.lastIndexOf(':');
String port = server.substring(separator + 1);
String hostname = server.substring(0, separator);
- ProtocolSession localSession = null;
- boolean error = false;
+ ProtocolSession localSession = null;
+ Socket socket = null;
+ boolean hasConnected = false;
+ Message errorMessage = null;
+
try
{
/*
@@ -1180,132 +1191,131 @@
*/
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
- InetAddress.getByName(hostname), intPort);
- Socket socket = new Socket();
+ InetAddress.getByName(hostname), intPort);
+ socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
socket.connect(serverAddr, 500);
- localSession = replSessionSecurity.createClientSession(server, socket,
- ReplSessionSecurity.HANDSHAKE_TIMEOUT);
- boolean isSslEncryption =
- replSessionSecurity.isSslEncryption(server);
- /*
- * Send our ServerStartMsg.
- */
- ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
- maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
- isSslEncryption,
- groupId);
+ localSession = replSessionSecurity.createClientSession(
+ socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT);
+ boolean isSslEncryption = replSessionSecurity
+ .isSslEncryption(server);
+
+ // Send our ServerStartMsg.
+ ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
+ baseDn, maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(),
+ this.getGenerationID(), isSslEncryption, groupId);
localSession.publish(serverStartMsg);
- /*
- * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
- * back.
- */
+ // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
+ // come back.
ReplicationMsg msg = localSession.receive();
-
if (debugEnabled())
{
- debugInfo("In RB for " + baseDn +
- "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
- "\nAND RECEIVED:\n" + msg.toString());
+ debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+ + serverStartMsg.toString() + "\nAND RECEIVED:\n"
+ + msg.toString());
}
// Wrap received message in a server info object
- replServerInfo = ReplicationServerInfo.newInstance(msg);
+ ReplicationServerInfo replServerInfo = ReplicationServerInfo
+ .newInstance(msg);
// Sanity check
String repDn = replServerInfo.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
- Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
- this.baseDn);
- logError(message);
- error = true;
+ errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
+ this.baseDn);
+ return null;
}
/*
- * We have sent our own protocol version to the replication server.
- * The replication server will use the same one (or an older one
- * if it is an old replication server).
+ * We have sent our own protocol version to the replication server. The
+ * replication server will use the same one (or an older one if it is an
+ * old replication server).
*/
- protocolVersion = ProtocolVersion.minWithCurrent(
- replServerInfo.getProtocolVersion());
+ protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo
+ .getProtocolVersion());
localSession.setProtocolVersion(protocolVersion);
-
if (!isSslEncryption)
{
localSession.stopEncryption();
}
- } catch (ConnectException e)
- {
- /*
- * There was no server waiting on this host:port
- * Log a notice and try the next replicationServer in the list
- */
- if (!connectionError)
- {
- Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
- if (keepConnection) // Log error message only for final connection
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- logError(message);
- } else if (debugEnabled())
- {
- debugInfo(message.toString());
- }
- }
- error = true;
- } catch (Exception e)
- {
- if ((e instanceof SocketTimeoutException) && debugEnabled())
- {
- debugInfo("Timeout trying to connect to RS " + server +
- " for dn: " + baseDn);
- }
- Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
- baseDn, server, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
- if (keepConnection) // Log error message only for final connection
- {
- logError(message);
- } else if (debugEnabled())
- {
- debugInfo(message.toString());
- }
- error = true;
- }
- // Close session if requested
- if (!keepConnection || error)
- {
- if (localSession != null)
+ hasConnected = true;
+
+ // If this connection as the one to use for sending and receiving
+ // updates, store it.
+ if (keepConnection)
{
- if (debugEnabled()) {
- debugInfo("In RB, closing session after phase 1");
+ session = localSession;
+ }
+
+ return replServerInfo;
+ }
+ catch (ConnectException e)
+ {
+ errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
+ server, baseDn);
+ return null;
+ }
+ catch (SocketTimeoutException e)
+ {
+ errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId,
+ server, baseDn);
+ return null;
+ }
+ catch (Exception e)
+ {
+ errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+ server, baseDn, stackTraceToSingleLineString(e));
+ return null;
+ }
+ finally
+ {
+ if (!hasConnected || !keepConnection)
+ {
+ if (localSession != null)
+ {
+ localSession.close();
}
- localSession.close();
- localSession = null;
+ if (socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ // Ignore.
+ }
+ }
}
- if (error)
+
+ if (!hasConnected && errorMessage != null)
{
- replServerInfo = null;
- } // Be sure to return null.
+ // There was no server waiting on this host:port Log a notice and try
+ // the next replicationServer in the list
+ if (!connectionError)
+ {
+ if (keepConnection) // Log error message only for final connection
+ {
+ // the error message is only logged once to avoid overflowing
+ // the error log
+ logError(errorMessage);
+ }
+ if (debugEnabled())
+ {
+ debugInfo(errorMessage.toString());
+ }
+ }
+ }
}
-
- // If this connection as the one to use for sending and receiving updates,
- // store it.
- if (keepConnection)
- {
- session = localSession;
- }
-
- return replServerInfo;
}
/**
@@ -1323,6 +1333,8 @@
private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
boolean keepConnection)
{
+ // FIXME: this should be merged with performPhaseOneHandshake to avoid
+ // code/bug duplication.
ReplServerStartDSMsg replServerStartDSMsg = null;
// Parse server string.
@@ -1344,7 +1356,7 @@
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
socket.connect(serverAddr, 500);
- localSession = replSessionSecurity.createClientSession(server, socket,
+ localSession = replSessionSecurity.createClientSession(socket,
ReplSessionSecurity.HANDSHAKE_TIMEOUT);
boolean isSslEncryption =
replSessionSecurity.isSslEncryption(server);
@@ -1400,7 +1412,9 @@
*/
if (!connectionError)
{
- Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+ Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
+ server, baseDn);
+
if (keepConnection) // Log error message only for final connection
{
// the error message is only logged once to avoid overflowing
@@ -1419,9 +1433,8 @@
debugInfo("Timeout trying to connect to RS " + server +
" for dn: " + baseDn);
}
- Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
- baseDn, server, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
+ Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+ server, baseDn, stackTraceToSingleLineString(e));
if (keepConnection) // Log error message only for final connection
{
logError(message);
@@ -1499,9 +1512,8 @@
} catch (Exception e)
{
- Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
- baseDn, server, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
+ Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+ server, baseDn, stackTraceToSingleLineString(e));
logError(message);
if (session != null)
@@ -1573,9 +1585,8 @@
} catch (Exception e)
{
- Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
- baseDn, server, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
+ Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+ server, baseDn, stackTraceToSingleLineString(e));
logError(message);
if (session != null)
@@ -2196,15 +2207,8 @@
// Start a heartbeat monitor thread.
if (heartbeatInterval > 0)
{
- String threadName = "Replica DS("
- + this.getServerId() + ") heartbeat monitor for domain \""
- + this.baseDn + "\" from RS(" + this.getRsServerId()
- + ") at " + session.getReadableRemoteAddress();
-
- heartbeatMonitor = new HeartbeatMonitor(
- threadName,
- session,
- heartbeatInterval);
+ heartbeatMonitor = new HeartbeatMonitor(getServerId(),
+ getRsServerId(), baseDn, session, heartbeatInterval);
heartbeatMonitor.start();
}
}
@@ -2496,7 +2500,11 @@
reStart(null, true);
}
- ProtocolSession failingSession = session;
+ // Save session information for later in case we need it for log messages
+ // after the session has been closed and/or failed.
+ final ProtocolSession failingSession = session;
+ final int replicationServerID = rsServerId;
+
try
{
ReplicationMsg msg = session.receive();
@@ -2530,11 +2538,12 @@
/*
* RS performs a proper disconnection
*/
- Message message =
- NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
- Integer.toString(rsServerId), baseDn.toString(),
- Integer.toString(serverId));
+ Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
+ .get(replicationServerID,
+ failingSession.getReadableRemoteAddress(),
+ serverId, baseDn);
logError(message);
+
// Try to find a suitable RS
this.reStart(failingSession, true);
} else if (msg instanceof MonitorMsg)
@@ -2597,12 +2606,10 @@
{
bestServerId = bestServerInfo.getServerId();
}
- Message message =
- NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
- Integer.toString(serverId),
- Integer.toString(rsServerId),
- rsServerUrl,
- Integer.toString(bestServerId));
+ Message message = NOTE_NEW_BEST_REPLICATION_SERVER
+ .get(serverId, replicationServerID,
+ failingSession.getReadableRemoteAddress(),
+ bestServerId, baseDn);
logError(message);
reStart(true);
}
@@ -2632,10 +2639,9 @@
/*
* We did not initiate the close on our side, log an error message.
*/
- Message message =
- ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
- Integer.toString(rsServerId), baseDn.toString(),
- Integer.toString(serverId));
+ Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
+ .get(serverId, baseDn, replicationServerID,
+ failingSession.getReadableRemoteAddress());
logError(message);
}
if (reconnectOnFailure)
--
Gitblit v1.10.0