From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 147 +++++++++++++++++++++++++++++++++++-------------
1 files changed, 107 insertions(+), 40 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 279e6db..f152406 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -316,6 +316,15 @@
}
/**
+ * Set the generation id - for test purpose.
+ * @param generationID The generation id
+ */
+ public void setGenerationID(long generationID)
+ {
+ this.generationID = generationID;
+ }
+
+ /**
* Gets the server url of the RS we are connected to.
* @return The server url of the RS we are connected to
*/
@@ -727,6 +736,15 @@
{
this.locallyConfigured = locallyConfigured;
}
+
+ /**
+ * Returns a string representation of this object.
+ * @return A string representation of this object.
+ */
+ public String toString()
+ {
+ return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
+ }
}
private void connect()
@@ -859,7 +877,8 @@
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
TRACER.debugInfo(
- "phase 2 : will perform PhaseOneH with the preferred RS.");
+ "phase 2 : will perform PhaseOneH with the preferred RS="
+ + replicationServerInfo);
replicationServerInfo = performPhaseOneHandshake(
replicationServerInfo.getServerURL(), true);
@@ -2225,18 +2244,20 @@
/**
* restart the ReplicationBroker.
+ * @param infiniteTry the socket which failed
*/
- public void reStart()
+ public void reStart(boolean infiniteTry)
{
- reStart(this.session);
+ reStart(this.session, infiniteTry);
}
/**
* Restart the ReplicationServer broker after a failure.
*
* @param failingSession the socket which failed
+ * @param infiniteTry the socket which failed
*/
- public void reStart(ProtocolSession failingSession)
+ public void reStart(ProtocolSession failingSession, boolean infiniteTry)
{
if (failingSession != null)
@@ -2268,6 +2289,7 @@
rsGroupId = (byte) -1;
rsServerId = -1;
rsServerUrl = null;
+ session = null;
}
while (!this.connected && (!this.shutdown))
{
@@ -2282,6 +2304,8 @@
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
+ if ((!connected) && (!infiniteTry))
+ break;
if ((!connected) && (!shutdown))
{
try
@@ -2293,6 +2317,11 @@
}
}
}
+ if (debugEnabled())
+ TRACER.debugInfo(this +
+ " end restart : connected=" + connected +
+ " with RSid=" + this.getRsServerId() +
+ " genid=" + this.generationID);
}
/**
@@ -2301,7 +2330,18 @@
*/
public void publish(ReplicationMsg msg)
{
- _publish(msg, false);
+ _publish(msg, false, true);
+ }
+
+ /**
+ * Publish a message to the other servers.
+ * @param msg The message to publish.
+ * @param retryOnFailure Whether reconnect should automatically be done.
+ * @return Whether publish succeeded.
+ */
+ public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
+ {
+ return _publish(msg, false, retryOnFailure);
}
/**
@@ -2310,15 +2350,18 @@
*/
public void publishRecovery(ReplicationMsg msg)
{
- _publish(msg, true);
+ _publish(msg, true, true);
}
/**
* Publish a message to the other servers.
* @param msg the message to publish
* @param recoveryMsg the message is a recovery Message
+ * @param retryOnFailure whether retry should be done on failure
+ * @return whether the message was successfully sent.
*/
- void _publish(ReplicationMsg msg, boolean recoveryMsg)
+ boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
+ boolean retryOnFailure)
{
boolean done = false;
@@ -2338,7 +2381,7 @@
"message is not possible due to existing connection error.");
}
- return;
+ return false;
}
try
@@ -2365,7 +2408,7 @@
// do it.
if (!recoveryMsg & connectRequiresRecovery)
{
- return;
+ return false;
}
if (msg instanceof UpdateMsg)
@@ -2408,6 +2451,9 @@
}
} catch (IOException e)
{
+ if (!retryOnFailure)
+ return false;
+
// The receive threads should handle reconnection or
// mark this broker in error. Just retry.
synchronized (connectPhaseLock)
@@ -2435,6 +2481,7 @@
}
}
}
+ return true;
}
/**
@@ -2450,7 +2497,7 @@
*/
public ReplicationMsg receive() throws SocketTimeoutException
{
- return receive(false);
+ return receive(false, true, false);
}
/**
@@ -2459,22 +2506,29 @@
* called in a single thread or protected by a locking mechanism
* before being called.
*
- * @return the received message
* @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
- * @param allowReconnectionMechanism If true, this allows the reconnection
- * mechanism to disconnect the broker if it detects that it should reconnect
- * to another replication server because of some criteria defined by the
- * algorithm where we choose a suitable replication server.
+ * @param reconnectToTheBestRS Whether broker will automatically switch
+ * to the best suitable RS.
+ * @param reconnectOnFailure Whether broker will automatically reconnect
+ * on failure.
+ * @param returnOnTopoChange Whether broker should return TopologyMsg
+ * received.
+ * @return the received message
+ *
+ * @throws SocketTimeoutException if the timeout set by setSoTimeout
+ * has expired
*/
- public ReplicationMsg receive(boolean allowReconnectionMechanism)
+ public ReplicationMsg receive(boolean reconnectToTheBestRS,
+ boolean reconnectOnFailure, boolean returnOnTopoChange)
throws SocketTimeoutException
{
while (shutdown == false)
{
- if (!connected)
+ if ((reconnectOnFailure) && (!connected))
{
- reStart(null);
+ // infinite try to reconnect
+ reStart(null, true);
}
ProtocolSession failingSession = session;
@@ -2496,11 +2550,16 @@
{
TopologyMsg topoMsg = (TopologyMsg) msg;
receiveTopo(topoMsg);
- if (allowReconnectionMechanism)
+ if (reconnectToTheBestRS)
{
// Reset wait time before next computation of best server
mustRunBestServerCheckingAlgorithm = 0;
}
+
+ // Caller wants to check what's changed
+ if (returnOnTopoChange)
+ return msg;
+
} else if (msg instanceof StopMsg)
{
/*
@@ -2512,7 +2571,7 @@
Integer.toString(serverId));
logError(message);
// Try to find a suitable RS
- this.reStart(failingSession);
+ this.reStart(failingSession, true);
} else if (msg instanceof MonitorMsg)
{
// This is the response to a MonitorRequest that was sent earlier or
@@ -2551,7 +2610,7 @@
// it is still the one we are currently connected to. If not,
// disconnect properly and let the connection algorithm re-connect to
// best replication server
- if (allowReconnectionMechanism)
+ if (reconnectToTheBestRS)
{
mustRunBestServerCheckingAlgorithm++;
if (mustRunBestServerCheckingAlgorithm == 2)
@@ -2572,9 +2631,10 @@
NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
Integer.toString(serverId),
Integer.toString(rsServerId),
- rsServerUrl);
+ rsServerUrl,
+ Integer.toString(bestServerInfo.getServerId()));
logError(message);
- reStart();
+ reStart(null, true);
}
// Reset wait time before next computation of best server
@@ -2603,10 +2663,13 @@
Integer.toString(serverId));
logError(message);
}
- this.reStart(failingSession);
+ if (reconnectOnFailure)
+ reStart(failingSession, true);
+ else
+ break; // does not seem necessary to explicitely disconnect ..
}
}
- }
+ } // while !shutdown
return null;
}
@@ -2676,11 +2739,10 @@
public void stop()
{
if (debugEnabled())
- {
debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
" close the connection to replication server " + rsServerId + " for" +
" domain " + baseDn);
- }
+
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
replicationServer = "stopped";
@@ -2690,25 +2752,17 @@
rsServerId = -1;
rsServerUrl = null;
- if (session != null)
+ try
{
if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// V4 protocol introduces a StopMsg to properly end communications
- try
- {
session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
}
- try
- {
- session.close();
- } catch (IOException e)
- {
- }
+ session.close();
+ } catch (Exception e)
+ {
+ // Anyway, going to close session, so nothing to do
}
}
@@ -2979,6 +3033,9 @@
*/
public void receiveTopo(TopologyMsg topoMsg)
{
+ if (debugEnabled())
+ TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
+
// Store new DS list
dsList = topoMsg.getDsList();
@@ -3100,4 +3157,14 @@
{
connectRequiresRecovery = b;
}
+
+ /**
+ * Returns whether the broker is shutting down.
+ * @return whether the broker is shutting down.
+ */
+ public boolean shuttingDown()
+ {
+ return shutdown;
+ }
+
}
--
Gitblit v1.10.0