From 0e53b0680f39190c20027a489b0f862150f6d80a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 19 Dec 2013 09:24:04 +0000
Subject: [PATCH] First step towards simplifying the ReplicationBroker class.
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 370 ++++++++++++++++++++++++++++------------------------
1 files changed, 200 insertions(+), 170 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index bc298fa..adc5046 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -36,9 +36,11 @@
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -67,18 +69,95 @@
{
/**
+ * Immutable class containing information about whether the broker is
+ * connected to an RS and data associated to this connected RS.
+ * <p>
+ * Mutable methods return a new version of this object copying the data that
+ * did not change.
+ */
+ // @Immutable
+ private static final class ConnectedRS
+ {
+
+ private final String replicationServer;
+ /** The info of the RS we are connected to. */
+ private final ReplicationServerInfo rsInfo;
+ private final boolean connected;
+
+ private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo,
+ String replicationServer)
+ {
+ this.connected = connected;
+ this.rsInfo = rsInfo;
+ this.replicationServer = replicationServer;
+ }
+
+ private static ConnectedRS stopped()
+ {
+ return new ConnectedRS(false, null, "stopped");
+ }
+
+ private static ConnectedRS noConnectedRS()
+ {
+ return new ConnectedRS(false, null, NO_CONNECTED_SERVER);
+ }
+
+ /**
+ * Returns a new version of the current object with the connected status set
+ * to true.
+ */
+ private ConnectedRS setConnected()
+ {
+ return new ConnectedRS(true, rsInfo, replicationServer);
+ }
+
+ public int getServerId()
+ {
+ return rsInfo != null ? rsInfo.getServerId() : -1;
+ }
+
+ private byte getGroupId()
+ {
+ return rsInfo != null ? rsInfo.getGroupId() : -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder();
+ toString(sb);
+ return sb.toString();
+ }
+
+ public void toString(StringBuilder sb)
+ {
+ sb.append("connected=").append(connected).append(", ");
+ if (rsInfo == null) // this is a null object
+ {
+ sb.append("no connected RS");
+ }
+ else
+ {
+ sb.append("connected RS(serverId=").append(rsInfo.getServerId())
+ .append(", serverUrl=").append(rsInfo.getServerURL())
+ .append(", groupId=").append(rsInfo.getGroupId())
+ .append(")");
+ }
+ }
+ }
+
+ /**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
private volatile boolean shutdown = false;
private final Object startStopLock = new Object();
private volatile ReplicationDomainCfg config;
- private volatile boolean connected = false;
/**
* String reported under CSN=monitor when there is no connected RS.
*/
public final static String NO_CONNECTED_SERVER = "Not connected";
- private volatile String replicationServer = NO_CONNECTED_SERVER;
private volatile Session session;
private final ServerState state;
private Semaphore sendWindow;
@@ -88,19 +167,15 @@
private int timeout = 0;
private short protocolVersion;
private ReplSessionSecurity replSessionSecurity;
- /** The group id of the RS we are connected to. */
- private byte rsGroupId = -1;
- /** The server id of the RS we are connected to. */
- private int rsServerId = -1;
- /** The server URL of the RS we are connected to. */
- private String rsServerUrl;
+ private final AtomicReference<ConnectedRS> connectedRS =
+ new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
/** Our replication domain. */
private ReplicationDomain domain;
/**
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
*/
- private final MutableBoolean monitorResponse = new MutableBoolean(false);
+ private final AtomicBoolean monitorResponse = new AtomicBoolean(false);
/**
* A Map containing the ServerStates of all the replicas in the topology
* as seen by the ReplicationServer the last time it was polled or the last
@@ -217,7 +292,7 @@
{
shutdown = false;
this.rcvWindow = getMaxRcvWindow();
- connect();
+ connect(connectedRS.get());
}
}
@@ -227,7 +302,7 @@
*/
public byte getRsGroupId()
{
- return rsGroupId;
+ return connectedRS.get().getGroupId();
}
/**
@@ -236,7 +311,7 @@
*/
public int getRsServerId()
{
- return rsServerId;
+ return connectedRS.get().getServerId();
}
/**
@@ -287,20 +362,11 @@
}
/**
- * Gets the server url of the RS we are connected to.
- * @return The server url of the RS we are connected to
- */
- public String getRsServerUrl()
- {
- return rsServerUrl;
- }
-
- /**
* Sets the locally configured flag for the passed ReplicationServerInfo
* object, analyzing the local configuration.
* @param rsInfo the Replication server to check and update
*/
- private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo)
+ private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
{
// Determine if the passed ReplicationServerInfo has a URL that is present
// in the locally configured replication servers
@@ -678,13 +744,14 @@
}
}
- private void connect()
+ private void connect(ConnectedRS rs)
{
if (getBaseDN().toNormalizedString().equalsIgnoreCase(
ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
{
- connectAsECL();
- } else
+ connectAsECL(rs);
+ }
+ else
{
connectAsDataServer();
}
@@ -697,8 +764,8 @@
*/
private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
{
- Map<Integer, ReplicationServerInfo> rsInfos =
- new ConcurrentHashMap<Integer, ReplicationServerInfo>();
+ final Map<Integer, ReplicationServerInfo> rsInfos =
+ new ConcurrentSkipListMap<Integer, ReplicationServerInfo>();
for (String serverUrl : getReplicationServerUrls())
{
@@ -732,14 +799,13 @@
* </li>
* </ul>
*/
- private void connectAsECL()
+ private void connectAsECL(ConnectedRS rs)
{
// FIXME:ECL List of RS to connect is for now limited to one RS only
- String bestServer = getReplicationServerUrls().iterator().next();
-
+ final String bestServer = getReplicationServerUrls().iterator().next();
if (performPhaseOneHandshake(bestServer, true, true) != null)
{
- performECLPhaseTwoHandshake(bestServer);
+ performECLPhaseTwoHandshake(bestServer, rs);
}
}
@@ -808,14 +874,16 @@
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
- ReplicationServerInfo electedRsInfo = null;
-
- if (replicationServerInfos.size() > 0)
+ if (replicationServerInfos.isEmpty())
+ {
+ connectedRS.set(ConnectedRS.noConnectedRS());
+ }
+ else
{
// At least one server answered, find the best one.
RSEvaluations evals = computeBestReplicationServer(true, -1, state,
replicationServerInfos, serverId, getGroupId(), getGenerationID());
- electedRsInfo = evals.getBestRS();
+ ReplicationServerInfo electedRsInfo = evals.getBestRS();
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
@@ -850,43 +918,33 @@
{
connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
} // Could perform handshake phase 2 with best
-
} // Could perform handshake phase 1 with best
+ }
- } // Reached some servers
-
- // connected is set by connectToReplicationServer()
- // and electedRsInfo isn't null then. Check anyway
- if (electedRsInfo != null && connected)
+ final ConnectedRS rs = connectedRS.get();
+ if (rs.connected)
{
connectPhaseLock.notify();
- if ((electedRsInfo.getGenerationId() == getGenerationID())
- || (electedRsInfo.getGenerationId() == -1))
+ final long rsGenId = rs.rsInfo.getGenerationId();
+ final int rsServerId = rs.rsInfo.getServerId();
+ if (rsGenId == getGenerationID() || rsGenId == -1)
{
- Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
- .get(serverId, rsServerId, baseDN.toNormalizedString(),
- session.getReadableRemoteAddress(),
- getGenerationID());
- logError(message);
- } else
- {
- Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
- .get(serverId, rsServerId, baseDN.toNormalizedString(),
- session.getReadableRemoteAddress(),
- getGenerationID(),
- electedRsInfo.getGenerationId());
- logError(message);
+ logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
+ serverId, rsServerId, baseDN.toNormalizedString(),
+ session.getReadableRemoteAddress(), getGenerationID()));
}
- } else
+ else
+ {
+ logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
+ serverId, rsServerId, baseDN.toNormalizedString(),
+ session.getReadableRemoteAddress(), getGenerationID(), rsGenId));
+ }
+ }
+ else
{
- /*
- * This server could not find any replicationServer. It's going to start
- * in degraded mode. Log a message.
- */
- connected = false;
- replicationServer = NO_CONNECTED_SERVER;
-
+ // This server could not find any replicationServer.
+ // It's going to start in degraded mode. Log a message.
if (!connectionError)
{
connectionError = true;
@@ -894,16 +952,14 @@
if (replicationServerInfos.size() > 0)
{
- Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
+ logError(WARN_COULD_NOT_FIND_CHANGELOG.get(
serverId, baseDN.toNormalizedString(),
- collectionToString(replicationServerInfos.keySet(), ", "));
- logError(message);
+ collectionToString(replicationServerInfos.keySet(), ", ")));
}
else
{
- Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
- serverId, baseDN.toNormalizedString());
- logError(message);
+ logError(WARN_NO_AVAILABLE_CHANGELOGS.get(
+ serverId, baseDN.toNormalizedString()));
}
}
}
@@ -925,13 +981,11 @@
{
final int serverId = getServerId();
final DN baseDN = getBaseDN();
+
+ ConnectedRS rs = null;
try
{
- replicationServer = session.getReadableRemoteAddress();
maxSendWindow = rsInfo.getWindowSize();
- rsGroupId = rsInfo.getGroupId();
- rsServerId = rsInfo.getServerId();
- rsServerUrl = rsInfo.getServerURL();
receiveTopo(topologyMsg);
@@ -964,7 +1018,8 @@
}
sendWindow = new Semaphore(maxSendWindow);
rcvWindow = getMaxRcvWindow();
- connected = true;
+ rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress());
+ connectedRS.set(rs);
/*
May have created a broker with null replication domain for
@@ -977,18 +1032,17 @@
}
final byte groupId = getGroupId();
- if (getRsGroupId() != groupId)
+ if (rs.getGroupId() != groupId)
{
/*
Connected to replication server with wrong group id:
warn user and start heartbeat monitor to recover when a server
with the right group id shows up.
*/
- Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
- Byte.toString(groupId), Integer.toString(rsServerId),
- rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
- baseDN.toNormalizedString(), Integer.toString(serverId));
- logError(message);
+ logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+ Byte.toString(groupId), Integer.toString(rs.getServerId()),
+ rsInfo.getServerURL(), Byte.toString(rs.getGroupId()),
+ baseDN.toNormalizedString(), Integer.toString(serverId)));
}
startRSHeartBeatMonitoring();
if (rsInfo.getProtocolVersion() >=
@@ -1006,8 +1060,9 @@
}
finally
{
- if (!connected)
+ if (rs == null)
{
+ connectedRS.set(ConnectedRS.noConnectedRS());
setSession(null);
}
}
@@ -1108,8 +1163,9 @@
boolean isSslEncryption = replSessionSecurity.isSslEncryption();
// Send our ServerStartMsg.
- String url = socket.getLocalAddress().getHostName() + ":"
- + socket.getLocalPort();
+ final HostPort hp = new HostPort(
+ socket.getLocalAddress().getHostName(), socket.getLocalPort());
+ String url = hp.toString();
StartMsg serverStartMsg;
if (!isECL)
{
@@ -1135,11 +1191,11 @@
}
// Wrap received message in a server info object
- ReplicationServerInfo replServerInfo = ReplicationServerInfo
- .newInstance(msg, server);
+ final ReplicationServerInfo replServerInfo =
+ ReplicationServerInfo.newInstance(msg, server);
// Sanity check
- DN repDN = replServerInfo.getBaseDN();
+ final DN repDN = replServerInfo.getBaseDN();
if (!getBaseDN().equals(repDN))
{
errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
@@ -1167,7 +1223,7 @@
hasConnected = true;
- // If this connection as the one to use for sending and receiving
+ // If this connection is the one to use for sending and receiving
// updates, store it.
if (keepConnection)
{
@@ -1180,20 +1236,17 @@
{
errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
server, getBaseDN().toNormalizedString());
- return null;
}
catch (SocketTimeoutException e)
{
errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
server, getBaseDN().toNormalizedString());
- return null;
}
catch (Exception e)
{
errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
server, getBaseDN().toNormalizedString(),
stackTraceToSingleLineString(e));
- return null;
}
finally
{
@@ -1203,26 +1256,28 @@
close(socket);
}
- if (!hasConnected && errorMessage != null)
+ if (keepConnection && !hasConnected)
{
- // There was no server waiting on this host:port Log a notice and try
- // the next replicationServer in the list
- if (!connectionError)
- {
- if (keepConnection) // Log error message only for final connection
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- logError(errorMessage);
- }
+ connectedRS.set(ConnectedRS.noConnectedRS());
+ }
- if (debugEnabled())
- {
- TRACER.debugInfo(errorMessage.toString());
- }
+ if (!hasConnected && errorMessage != null && !connectionError)
+ {
+ // There was no server waiting on this host:port
+ // Log a notice and will try the next replicationServer in the list
+ if (keepConnection) // Log error message only for final connection
+ {
+ // log the error message only once to avoid overflowing the error log
+ logError(errorMessage);
+ }
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo(errorMessage.toString());
}
}
}
+ return null;
}
@@ -1234,7 +1289,7 @@
*
* @param server Server we are connecting with.
*/
- private void performECLPhaseTwoHandshake(String server)
+ private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
{
try
{
@@ -1252,14 +1307,16 @@
// Alright set the timeout to the desired value
localSession.setSoTimeout(timeout);
- connected = true;
- } catch (Exception e)
+ connectedRS.set(rs.setConnected());
+ }
+ catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
getServerId(), server, getBaseDN().toNormalizedString(),
stackTraceToSingleLineString(e));
logError(message);
+ connectedRS.set(ConnectedRS.noConnectedRS());
setSession(null);
}
}
@@ -1287,8 +1344,7 @@
// unit test purpose.
if (domain != null)
{
- startSessionMsg =
- new StartSessionMsg(
+ startSessionMsg = new StartSessionMsg(
initStatus,
domain.getRefUrls(),
domain.isAssured(),
@@ -1306,9 +1362,7 @@
final Session localSession = session;
localSession.publish(startSessionMsg);
- /*
- * Read the TopologyMsg that should come back.
- */
+ // Read the TopologyMsg that should come back.
final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
if (debugEnabled())
@@ -1320,13 +1374,15 @@
// Alright set the timeout to the desired value
localSession.setSoTimeout(timeout);
return topologyMsg;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
getServerId(), server, getBaseDN().toNormalizedString(),
stackTraceToSingleLineString(e));
logError(message);
+ connectedRS.set(ConnectedRS.noConnectedRS());
setSession(null);
// Be sure to return null.
@@ -2263,13 +2319,14 @@
numLostConnections++;
}
+ ConnectedRS rs;
if (failingSession == session)
{
- connected = false;
- rsGroupId = -1;
- rsServerId = -1;
- rsServerUrl = null;
+ rs = ConnectedRS.noConnectedRS();
+ connectedRS.set(rs);
setSession(null);
+ } else {
+ rs = connectedRS.get();
}
while (true)
@@ -2277,14 +2334,15 @@
// Synchronize inside the loop in order to allow shutdown.
synchronized (startStopLock)
{
- if (connected || shutdown)
+ if (rs.connected || shutdown)
{
break;
}
try
{
- connect();
+ connect(rs);
+ rs = connectedRS.get();
}
catch (Exception e)
{
@@ -2295,11 +2353,10 @@
logError(mb.toMessage());
}
- if (connected || !infiniteTry)
+ if (rs.connected || !infiniteTry)
{
break;
}
-
}
try
{
@@ -2313,8 +2370,8 @@
if (debugEnabled())
{
- debugInfo("end restart : connected=" + connected + " with RS("
- + getRsServerId() + ") genId=" + this.generationID);
+ debugInfo("end restart : connected=" + rs.connected + " with RS("
+ + rs.getServerId() + ") genId=" + generationID);
}
}
@@ -2533,7 +2590,8 @@
{
while (!shutdown)
{
- if (reconnectOnFailure && !connected)
+ final ConnectedRS rs = connectedRS.get();
+ if (reconnectOnFailure && !rs.connected)
{
// infinite try to reconnect
reStart(null, true);
@@ -2550,7 +2608,7 @@
final int serverId = getServerId();
final DN baseDN = getBaseDN();
- final int previousRsServerID = rsServerId;
+ final int previousRsServerID = rs.getServerId();
try
{
ReplicationMsg msg = localSession.receive();
@@ -2786,19 +2844,15 @@
public void stop()
{
if (debugEnabled())
- debugInfo("is stopping and will close the connection to"
- + " replication server " + rsServerId);
+ debugInfo("is stopping and will close the connection to RS("
+ + getRsServerId() + ")");
synchronized (startStopLock)
{
shutdown = true;
- connected = false;
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
- replicationServer = "stopped";
- rsGroupId = -1;
- rsServerId = -1;
- rsServerUrl = null;
+ connectedRS.set(ConnectedRS.stopped());
setSession(null);
deregisterReplicationMonitor();
}
@@ -2834,7 +2888,7 @@
*/
public String getReplicationServer()
{
- return replicationServer;
+ return connectedRS.get().replicationServer;
}
/**
@@ -2874,7 +2928,7 @@
*/
public int getCurrentSendWindow()
{
- if (connected)
+ if (isConnected())
{
return sendWindow.availablePermits();
}
@@ -2934,7 +2988,7 @@
*/
public boolean isConnected()
{
- return connected;
+ return connectedRS.get().connected;
}
/**
@@ -3003,7 +3057,7 @@
{
List<Integer> connectedDSs = new ArrayList<Integer>();
- if (rsServerId == rsId)
+ if (getRsServerId() == rsId)
{
/*
If we are computing connected DSs for the RS we are connected
@@ -3044,15 +3098,13 @@
{
int rsId = rsInfo.getId();
rssToKeep.add(rsId); // Mark this server as still existing
- List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+ final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
if (rsInfo2 == null)
{
// New replication server, create info for it add it to the list
rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
- // Set the locally configured flag for this new RS only if it is
- // configured
- updateRSInfoLocallyConfiguredStatus(rsInfo2);
+ setLocallyConfiguredFlag(rsInfo2);
replicationServerInfos.put(rsId, rsInfo2);
} else
{
@@ -3061,21 +3113,9 @@
}
}
- /**
- * Now remove any replication server that may have disappeared from the
- * topology.
- */
- Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator();
- while (rsInfoIt.hasNext())
- {
- final Integer rsId = rsInfoIt.next();
- if (!rssToKeep.contains(rsId))
- {
- // This replication server has quit the topology, remove it from the
- // list
- rsInfoIt.remove();
- }
- }
+ // Remove any replication server that may have disappeared from the topology
+ replicationServerInfos.keySet().retainAll(rssToKeep);
+
if (domain != null)
{
for (DSInfo info : dsList)
@@ -3231,18 +3271,8 @@
.append(getServerId()).append("\",")
.append(" groupId=").append(getGroupId())
.append(", genId=").append(generationID)
- .append(", connected=").append(connected).append(", ");
- if (rsServerId == -1)
- {
- sb.append("no RS");
- }
- else
- {
- sb.append("bestRS(serverId=").append(rsServerId)
- .append(", serverUrl=").append(rsServerUrl)
- .append(", groupId=").append(rsGroupId)
- .append(")");
- }
+ .append(", ");
+ connectedRS.get().toString(sb);
return sb.toString();
}
--
Gitblit v1.10.0