From c63e1f305327734be21f5ce0e21bdd2f7a4d143b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 10:32:47 +0000
Subject: [PATCH] Enforced ReplicationServerDomain responsibilities by increasing encapsulation.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 19 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 69 +----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 234 ++++++++-----------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 255 +++++++++++----------
opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 99 +++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 33 -
6 files changed, 308 insertions(+), 401 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 8ec6405..4b25380 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -229,14 +229,6 @@
return newStatus;
}
- private void createStatusAnalyzer()
- {
- if (!replicationServerDomain.isRunningStatusAnalyzer())
- {
- replicationServerDomain.startStatusAnalyzer();
- }
- }
-
/**
* Retrieves a set of attributes containing monitor data that should be
* returned to the client if the corresponding monitor entry is requested.
@@ -457,8 +449,7 @@
localGenerationId = replicationServerDomain.getGenerationId();
oldGenerationId = localGenerationId;
- // Duplicate server ?
- if (!replicationServerDomain.checkForDuplicateDS(this))
+ if (replicationServerDomain.isAlreadyConnectedToDS(this))
{
abortStart(null);
return;
@@ -468,7 +459,6 @@
{
StartMsg outStartMsg = sendStartToRemote();
- // log
logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
// The session initiator decides whether to use SSL.
@@ -508,11 +498,8 @@
throw new DirectoryException(ResultCode.OTHER, null, null);
}
- // Create the status analyzer for the domain if not already started
- createStatusAnalyzer();
-
- // Create the monitoring publisher for the domain if not already started
- createMonitoringPublisher();
+ replicationServerDomain.startStatusAnalyzer();
+ replicationServerDomain.startMonitoringPublisher();
registerIntoDomain();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index d02fe4a..9a136b5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -399,11 +399,7 @@
for (ReplicationServerDomain domain : getReplicationServerDomains())
{
// Create a normalized set of server URLs.
- final Set<String> connectedReplServers = new HashSet<String>();
- for (String url : domain.getChangelogs())
- {
- connectedReplServers.add(normalizeServerURL(url));
- }
+ final Set<String> connectedRSUrls = getConnectedRSUrls(domain);
/*
* check that all replication server in the config are in the
@@ -440,7 +436,7 @@
// Don't connect to a server if it is already connected.
final String normalizedServerURL = normalizeServerURL(aServerURL);
- if (connectedReplServers.contains(normalizedServerURL))
+ if (connectedRSUrls.contains(normalizedServerURL))
{
continue;
}
@@ -472,6 +468,16 @@
}
}
+ private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
+ {
+ Set<String> results = new LinkedHashSet<String>();
+ for (ReplicationServerHandler handler : domain.getConnectedRSs().values())
+ {
+ results.add(normalizeServerURL(handler.getServerAddressURL()));
+ }
+ return results;
+ }
+
/**
* Establish a connection to the server with the address and port.
*
@@ -1039,59 +1045,23 @@
// Update threshold value for status analyzers (stop them if requested
// value is 0)
- if (degradedStatusThreshold != configuration
- .getDegradedStatusThreshold())
+ if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
{
- int oldThresholdValue = degradedStatusThreshold;
- degradedStatusThreshold = configuration
- .getDegradedStatusThreshold();
+ degradedStatusThreshold = configuration.getDegradedStatusThreshold();
for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- if (degradedStatusThreshold == 0)
- {
- // Requested to stop analyzers
- domain.stopStatusAnalyzer();
- }
- else if (domain.isRunningStatusAnalyzer())
- {
- // Update the threshold value for this running analyzer
- domain.updateStatusAnalyzer(degradedStatusThreshold);
- }
- else if (oldThresholdValue == 0)
- {
- // Requested to start analyzers with provided threshold value
- if (domain.getConnectedDSs().size() > 0)
- domain.startStatusAnalyzer();
- }
+ domain.updateDegradedStatusThreshold(degradedStatusThreshold);
}
}
// Update period value for monitoring publishers (stop them if requested
// value is 0)
- if (monitoringPublisherPeriod != configuration
- .getMonitoringPeriod())
+ if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
{
- long oldMonitoringPeriod = monitoringPublisherPeriod;
monitoringPublisherPeriod = configuration.getMonitoringPeriod();
for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- if (monitoringPublisherPeriod == 0L)
- {
- // Requested to stop monitoring publishers
- domain.stopMonitoringPublisher();
- }
- else if (domain.isRunningMonitoringPublisher())
- {
- // Update the threshold value for this running monitoring publisher
- domain.updateMonitoringPublisher(monitoringPublisherPeriod);
- }
- else if (oldMonitoringPeriod == 0L)
- {
- // Requested to start monitoring publishers with provided period value
- if ((domain.getConnectedDSs().size() > 0)
- || (domain.getConnectedRSs().size() > 0))
- domain.startMonitoringPublisher();
- }
+ domain.updateMonitoringPeriod(monitoringPublisherPeriod);
}
}
@@ -1126,7 +1096,7 @@
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
- /*
+ /**
* Try and set a sensible URL for this replication server. Since we are
* listening on all addresses there are a couple of potential candidates: 1) a
* matching server url in the replication server's configuration, 2) hostname
@@ -1666,8 +1636,7 @@
}
if (debugEnabled())
- TRACER.debugInfo("In " + this +
- " getEligibleCN() ends with " +
+ TRACER.debugInfo("In " + this + " getEligibleCN() ends with " +
" the following domainEligibleCN for each domain :" + debugLog +
" thus CrossDomainEligibleCN=" + eligibleCN +
" ts=" + new Date(eligibleCN.getTime()).toString());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 31156f3..099f608 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -940,14 +940,14 @@
/**
* Stop operations with a list of replication servers.
*
- * @param replServers the replication servers for which
- * we want to stop operations
+ * @param replServerURLs
+ * the replication servers URLs for which we want to stop operations
*/
- public void stopReplicationServers(Collection<String> replServers)
+ public void stopReplicationServers(Collection<String> replServerURLs)
{
for (ReplicationServerHandler handler : connectedRSs.values())
{
- if (replServers.contains(handler.getServerAddressURL()))
+ if (replServerURLs.contains(handler.getServerAddressURL()))
{
stopServer(handler, false);
}
@@ -976,12 +976,13 @@
}
/**
- * Checks that a DS is not connected with same id.
+ * Checks whether it is already connected to a DS with same id.
*
- * @param handler the DS we want to check
- * @return true if this is not a duplicate server
+ * @param handler
+ * the DS we want to check
+ * @return true if this DS is already connected to the current server
*/
- public boolean checkForDuplicateDS(DataServerHandler handler)
+ public boolean isAlreadyConnectedToDS(DataServerHandler handler)
{
if (connectedDSs.containsKey(handler.getServerId()))
{
@@ -991,9 +992,9 @@
connectedDSs.get(handler.getServerId()).toString(),
handler.toString(), handler.getServerId());
logError(message);
- return false;
+ return true;
}
- return true;
+ return false;
}
/**
@@ -1005,6 +1006,7 @@
*/
public void stopServer(ServerHandler handler, boolean shutdown)
{
+ // TODO JNR merge with stopServer(MessageHandler)
if (debugEnabled())
{
TRACER.debugInfo("In "
@@ -1055,22 +1057,9 @@
stopMonitoringPublisher();
}
- if (handler.isReplicationServer())
+ if (connectedRSs.containsKey(handler.getServerId()))
{
- if (connectedRSs.containsKey(handler.getServerId()))
- {
- unregisterServerHandler(handler);
- handler.shutdown();
-
- // Check if generation id has to be reset
- mayResetGenerationId();
- if (!shutdown)
- {
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
- }
- }
+ unregisterServerHandler(handler, shutdown, false);
} else if (connectedDSs.containsKey(handler.getServerId()))
{
// If this is the last DS for the domain,
@@ -1086,25 +1075,10 @@
}
stopStatusAnalyzer();
}
-
- unregisterServerHandler(handler);
- handler.shutdown();
-
- // Check if generation id has to be reset
- mayResetGenerationId();
- if (!shutdown)
- {
- // Update the remote replication servers with our list
- // of connected LDAP servers
- buildAndSendTopoInfoToRSs();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
- }
+ unregisterServerHandler(handler, shutdown, true);
} else if (otherHandlers.contains(handler))
{
- unRegisterHandler(handler);
- handler.shutdown();
+ unregisterOtherHandler(handler);
}
}
catch(Exception e)
@@ -1122,12 +1096,41 @@
}
}
+ private void unregisterOtherHandler(MessageHandler handler)
+ {
+ unRegisterHandler(handler);
+ handler.shutdown();
+ }
+
+ private void unregisterServerHandler(ServerHandler handler, boolean shutdown,
+ boolean isDirectoryServer)
+ {
+ unregisterServerHandler(handler);
+ handler.shutdown();
+
+ // Check if generation id has to be reset
+ mayResetGenerationId();
+ if (!shutdown)
+ {
+ if (isDirectoryServer)
+ {
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ buildAndSendTopoInfoToRSs();
+ }
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
+ }
+ }
+
/**
* Stop the handler.
* @param handler The handler to stop.
*/
public void stopServer(MessageHandler handler)
{
+ // TODO JNR merge with stopServer(ServerHandler, boolean)
if (debugEnabled())
{
TRACER.debugInfo("In "
@@ -1163,8 +1166,7 @@
{
if (otherHandlers.contains(handler))
{
- unRegisterHandler(handler);
- handler.shutdown();
+ unregisterOtherHandler(handler);
}
}
catch(Exception e)
@@ -1269,39 +1271,40 @@
}
/**
- * Checks that a remote RS is not already connected to this hosting RS.
- * @param handler The handler for the remote RS.
+ * Checks whether a remote RS is already connected to this hosting RS.
+ *
+ * @param handler
+ * The handler for the remote RS.
* @return flag specifying whether the remote RS is already connected.
- * @throws DirectoryException when a problem occurs.
+ * @throws DirectoryException
+ * when a problem occurs.
*/
- public boolean checkForDuplicateRS(ReplicationServerHandler handler)
- throws DirectoryException
+ public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler)
+ throws DirectoryException
{
ReplicationServerHandler oldHandler =
- connectedRSs.get(handler.getServerId());
- if (oldHandler != null)
+ connectedRSs.get(handler.getServerId());
+ if (oldHandler == null)
{
- if (oldHandler.getServerAddressURL().equals(
- handler.getServerAddressURL()))
- {
- // this is the same server, this means that our ServerStart messages
- // have been sent at about the same time and 2 connections
- // have been established.
- // Silently drop this connection.
- return false;
- }
- else
- {
- // looks like two replication servers have the same serverId
- // log an error message and drop this connection.
- Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
- localReplicationServer.getMonitorInstanceName(), oldHandler.
- getServerAddressURL(), handler.getServerAddressURL(),
- handler.getServerId());
- throw new DirectoryException(ResultCode.OTHER, message);
- }
+ return false;
}
- return true;
+
+ if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL()))
+ {
+ // this is the same server, this means that our ServerStart messages
+ // have been sent at about the same time and 2 connections
+ // have been established.
+ // Silently drop this connection.
+ return true;
+ }
+
+ // looks like two replication servers have the same serverId
+ // log an error message and drop this connection.
+ Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
+ localReplicationServer.getMonitorInstanceName(),
+ oldHandler.getServerAddressURL(), handler.getServerAddressURL(),
+ handler.getServerId());
+ throw new DirectoryException(ResultCode.OTHER, message);
}
/**
@@ -1327,21 +1330,6 @@
}
/**
- * Return a Set of String containing the lists of Replication servers
- * connected to this server.
- * @return the set of connected servers
- */
- public Set<String> getChangelogs()
- {
- Set<String> results = new LinkedHashSet<String>();
- for (ReplicationServerHandler handler : connectedRSs.values())
- {
- results.add(handler.getServerAddressURL());
- }
- return results;
- }
-
- /**
* Return a set containing the server that produced update and known by
* this replicationServer from all over the topology,
* whatever directly connected of connected to another RS.
@@ -2861,11 +2849,11 @@
}
/**
- * Starts the status analyzer for the domain.
+ * Starts the status analyzer for the domain if not already started.
*/
public void startStatusAnalyzer()
{
- if (statusAnalyzer == null)
+ if (!isRunningStatusAnalyzer())
{
int degradedStatusThreshold =
localReplicationServer.getDegradedStatusThreshold();
@@ -2880,9 +2868,9 @@
/**
* Stops the status analyzer for the domain.
*/
- public void stopStatusAnalyzer()
+ private void stopStatusAnalyzer()
{
- if (statusAnalyzer != null)
+ if (isRunningStatusAnalyzer())
{
statusAnalyzer.shutdown();
statusAnalyzer.waitForShutdown();
@@ -2894,32 +2882,19 @@
* Tests if the status analyzer for this domain is running.
* @return True if the status analyzer is running, false otherwise.
*/
- public boolean isRunningStatusAnalyzer()
+ private boolean isRunningStatusAnalyzer()
{
return statusAnalyzer != null;
}
/**
- * Update the status analyzer with the new threshold value.
- * @param degradedStatusThreshold The new threshold value.
- */
- public void updateStatusAnalyzer(int degradedStatusThreshold)
- {
- if (statusAnalyzer != null)
- {
- statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
- }
- }
-
- /**
- * Starts the monitoring publisher for the domain.
+ * Starts the monitoring publisher for the domain if not already started.
*/
public void startMonitoringPublisher()
{
- if (monitoringPublisher == null)
+ if (!isRunningMonitoringPublisher())
{
- long period =
- localReplicationServer.getMonitoringPublisherPeriod();
+ long period = localReplicationServer.getMonitoringPublisherPeriod();
if (period > 0) // 0 means no monitoring publisher
{
monitoringPublisher = new MonitoringPublisher(this, period);
@@ -2931,9 +2906,9 @@
/**
* Stops the monitoring publisher for the domain.
*/
- public void stopMonitoringPublisher()
+ private void stopMonitoringPublisher()
{
- if (monitoringPublisher != null)
+ if (isRunningMonitoringPublisher())
{
monitoringPublisher.shutdown();
monitoringPublisher.waitForShutdown();
@@ -2945,24 +2920,12 @@
* Tests if the monitoring publisher for this domain is running.
* @return True if the monitoring publisher is running, false otherwise.
*/
- public boolean isRunningMonitoringPublisher()
+ private boolean isRunningMonitoringPublisher()
{
return monitoringPublisher != null;
}
/**
- * Update the monitoring publisher with the new period value.
- * @param period The new period value.
- */
- public void updateMonitoringPublisher(long period)
- {
- if (monitoringPublisher != null)
- {
- monitoringPublisher.setPeriod(period);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -3384,4 +3347,52 @@
{
return this.localReplicationServer.getServerId();
}
+
+ /**
+ * Update the status analyzer with the new threshold value.
+ *
+ * @param degradedStatusThreshold
+ * The new threshold value.
+ */
+ void updateDegradedStatusThreshold(int degradedStatusThreshold)
+ {
+ if (degradedStatusThreshold == 0)
+ {
+ // Requested to stop analyzers
+ stopStatusAnalyzer();
+ }
+ else if (isRunningStatusAnalyzer())
+ {
+ statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
+ }
+ else if (getConnectedDSs().size() > 0)
+ {
+ // Requested to start analyzers with provided threshold value
+ startStatusAnalyzer();
+ }
+ }
+
+ /**
+ * Update the monitoring publisher with the new period value.
+ *
+ * @param period
+ * The new period value.
+ */
+ void updateMonitoringPeriod(long period)
+ {
+ if (period == 0)
+ {
+ // Requested to stop monitoring publishers
+ stopMonitoringPublisher();
+ }
+ else if (isRunningMonitoringPublisher())
+ {
+ monitoringPublisher.setPeriod(period);
+ }
+ else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
+ {
+ // Requested to start monitoring publishers with provided period value
+ startMonitoringPublisher();
+ }
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 6f96bda..1753dd6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -160,7 +160,6 @@
{
lockDomain(false); // no timeout
- // Send start
ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
// Wait answer
@@ -174,22 +173,19 @@
// Remote replication server is probably shutting down or simultaneous
// cross-connect detected.
abortStart(null);
- return;
}
else
{
Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
.getClass().getCanonicalName(), "ReplServerStartMsg");
abortStart(message);
- return;
}
+ return;
}
- // Process hello from remote.
- processStartFromRemote((ReplServerStartMsg)msg);
+ processStartFromRemote((ReplServerStartMsg) msg);
- // Duplicate server ?
- if (!replicationServerDomain.checkForDuplicateRS(this))
+ if (replicationServerDomain.isAlreadyConnectedToRS(this))
{
// Simultaneous cross connect.
abortStart(null);
@@ -207,10 +203,9 @@
generationId, false);
}
- // Log
logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg);
- // Until here session is encrypted then it depends on the negociation
+ // Until here session is encrypted then it depends on the negotiation
// The session initiator decides whether to use SSL.
if (!this.sslEncryption)
session.stopEncryption();
@@ -239,8 +234,7 @@
logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
- // Create the monitoring publisher for the domain if not already started
- createMonitoringPublisher();
+ replicationServerDomain.startMonitoringPublisher();
/*
FIXME: i think this should be done for all protocol version !!
@@ -292,7 +286,6 @@
}
finally
{
- // Release domain
if (replicationServerDomain != null &&
replicationServerDomain.hasLock())
replicationServerDomain.release();
@@ -316,8 +309,7 @@
// lock with timeout
lockDomain(true);
- // Duplicate server ?
- if (!replicationServerDomain.checkForDuplicateRS(this))
+ if (replicationServerDomain.isAlreadyConnectedToRS(this))
{
abortStart(null);
return;
@@ -326,7 +318,6 @@
this.localGenerationId = replicationServerDomain.getGenerationId();
ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
- // log
logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
/*
@@ -358,7 +349,6 @@
.createTopologyMsgForRS();
sendTopoInfo(outTopoMsg);
- // log
logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
}
else
@@ -390,9 +380,7 @@
*/
}
-
- // Create the monitoring publisher for the domain if not already started
- createMonitoringPublisher();
+ replicationServerDomain.startMonitoringPublisher();
registerIntoDomain();
@@ -616,9 +604,7 @@
public void shutdown()
{
super.shutdown();
- /*
- * Stop the remote LSHandler
- */
+ // Stop the remote LSHandler
synchronized (remoteDirectoryServers)
{
for (LightweightServerHandler lsh : remoteDirectoryServers.values())
@@ -755,7 +741,7 @@
attributes.add(Attributes.create("missing-changes",
String.valueOf(md.getMissingChangesRS(serverId))));
- /* get the Server State */
+ // get the Server State
AttributeBuilder builder = new AttributeBuilder("server-state");
ServerState state = md.getRSStates(serverId);
if (state != null)
@@ -769,6 +755,7 @@
return attributes;
}
+
/**
* {@inheritDoc}
*/
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c377e5f..339ce80 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@
if (debugEnabled())
TRACER.debugInfo("In " +
((handler != null) ? handler.toString() : "Replication Server") +
- " closing session with err=" +
- providedMsg.toString());
+ " closing session with err=" + providedMsg);
logError(providedMsg);
}
@@ -125,7 +124,7 @@
*/
protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
/**
- // Number of updates received from the server in assured safe data mode.
+ * Number of updates received from the server in assured safe data mode.
*/
protected int assuredSdReceivedUpdates = 0;
/**
@@ -169,7 +168,7 @@
/**
* The initial size of the sending window.
*/
- int sendWindowSize;
+ protected int sendWindowSize;
/**
* remote generation id.
*/
@@ -185,7 +184,7 @@
/**
* Group id of this remote server.
*/
- protected byte groupId = (byte) -1;
+ protected byte groupId = -1;
/**
* The SSL encryption after the negotiation with the peer.
*/
@@ -254,8 +253,7 @@
closeSession(localSession, reason, this);
}
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
+ if (replicationServerDomain != null && replicationServerDomain.hasLock())
replicationServerDomain.release();
// If generation id of domain was changed, set it back to old value
@@ -263,10 +261,9 @@
// peer server and the last topo message sent may have failed being
// sent: in that case retrieve old value of generation id for
// replication server domain
- if (oldGenerationId != -100)
+ if (oldGenerationId != -100 && replicationServerDomain != null)
{
- if (replicationServerDomain!=null)
- replicationServerDomain.changeGenerationId(oldGenerationId, false);
+ replicationServerDomain.changeGenerationId(oldGenerationId, false);
}
}
@@ -304,7 +301,6 @@
@Override
public boolean engageShutdown()
{
- // Use thread safe boolean
return shuttingDown.getAndSet(true);
}
@@ -340,13 +336,11 @@
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
- writer = new ServerWriter(session, this,
- replicationServerDomain);
+ writer = new ServerWriter(session, this, replicationServerDomain);
reader = new ServerReader(session, this);
- session.setName("Replication server RS("
- + this.getReplicationServerId()
- + ") session thread to " + this.toString() + " at "
+ session.setName("Replication server RS(" + getReplicationServerId()
+ + ") session thread to " + this + " at "
+ session.getReadableRemoteAddress());
session.start();
try
@@ -366,9 +360,8 @@
// Create a thread to send heartbeat messages.
if (heartbeatInterval > 0)
{
- String threadName = "Replication server RS("
- + this.getReplicationServerId()
- + ") heartbeat publisher to " + this.toString() + " at "
+ String threadName = "Replication server RS(" + getReplicationServerId()
+ + ") heartbeat publisher to " + this + " at "
+ session.getReadableRemoteAddress();
heartbeatThread = new HeartbeatThread(threadName, session,
heartbeatInterval / 3);
@@ -788,7 +781,7 @@
*/
public boolean isReplicationServer()
{
- return (!this.isDataServer());
+ return !this.isDataServer();
}
@@ -827,62 +820,58 @@
// it will be created and locked later in the method
if (!timedout)
{
- // !timedout
if (!replicationServerDomain.hasLock())
replicationServerDomain.lock();
+ return;
}
- else
+
+ /**
+ * Take the lock on the domain.
+ * WARNING: Here we try to acquire the lock with a timeout. This
+ * is for preventing a deadlock that may happen if there are cross
+ * connection attempts (for same domain) from this replication
+ * server and from a peer one:
+ * Here is the scenario:
+ * - RS1 connect thread takes the domain lock and starts
+ * connection to RS2
+ * - at the same time RS2 connect thread takes his domain lock and
+ * start connection to RS2
+ * - RS2 listen thread starts processing received
+ * ReplServerStartMsg from RS1 and wants to acquire the lock on
+ * the domain (here) but cannot as RS2 connect thread already has
+ * it
+ * - RS1 listen thread starts processing received
+ * ReplServerStartMsg from RS2 and wants to acquire the lock on
+ * the domain (here) but cannot as RS1 connect thread already has
+ * it
+ * => Deadlock: 4 threads are locked.
+ * So to prevent that in such situation, the listen threads here
+ * will both timeout trying to acquire the lock. The random time
+ * for the timeout should allow on connection attempt to be
+ * aborted whereas the other one should have time to finish in the
+ * same time.
+ * Warning: the minimum time (3s) should be big enough to allow
+ * normal situation connections to terminate. The added random
+ * time should represent a big enough range so that the chance to
+ * have one listen thread timing out a lot before the peer one is
+ * great. When the first listen thread times out, the remote
+ * connect thread should release the lock and allow the peer
+ * listen thread to take the lock it was waiting for and process
+ * the connection attempt.
+ */
+ Random random = new Random();
+ int randomTime = random.nextInt(6); // Random from 0 to 5
+ // Wait at least 3 seconds + (0 to 5 seconds)
+ long timeout = 3000 + (randomTime * 1000);
+ boolean lockAcquired = replicationServerDomain.tryLock(timeout);
+ if (!lockAcquired)
{
- // timedout
- /**
- * Take the lock on the domain.
- * WARNING: Here we try to acquire the lock with a timeout. This
- * is for preventing a deadlock that may happen if there are cross
- * connection attempts (for same domain) from this replication
- * server and from a peer one:
- * Here is the scenario:
- * - RS1 connect thread takes the domain lock and starts
- * connection to RS2
- * - at the same time RS2 connect thread takes his domain lock and
- * start connection to RS2
- * - RS2 listen thread starts processing received
- * ReplServerStartMsg from RS1 and wants to acquire the lock on
- * the domain (here) but cannot as RS2 connect thread already has
- * it
- * - RS1 listen thread starts processing received
- * ReplServerStartMsg from RS2 and wants to acquire the lock on
- * the domain (here) but cannot as RS1 connect thread already has
- * it
- * => Deadlock: 4 threads are locked.
- * So to prevent that in such situation, the listen threads here
- * will both timeout trying to acquire the lock. The random time
- * for the timeout should allow on connection attempt to be
- * aborted whereas the other one should have time to finish in the
- * same time.
- * Warning: the minimum time (3s) should be big enough to allow
- * normal situation connections to terminate. The added random
- * time should represent a big enough range so that the chance to
- * have one listen thread timing out a lot before the peer one is
- * great. When the first listen thread times out, the remote
- * connect thread should release the lock and allow the peer
- * listen thread to take the lock it was waiting for and process
- * the connection attempt.
- */
- Random random = new Random();
- int randomTime = random.nextInt(6); // Random from 0 to 5
- // Wait at least 3 seconds + (0 to 5 seconds)
- long timeout = 3000 + (randomTime * 1000);
- boolean noTimeout = replicationServerDomain.tryLock(timeout);
- if (!noTimeout)
- {
- // Timeout
- Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
- getBaseDN(),
- serverId,
- session.getReadableRemoteAddress(),
- getReplicationServerId());
- throw new DirectoryException(ResultCode.OTHER, message);
- }
+ Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+ getBaseDN(),
+ serverId,
+ session.getReadableRemoteAddress(),
+ getReplicationServerId());
+ throw new DirectoryException(ResultCode.OTHER, message);
}
}
@@ -1011,9 +1000,7 @@
session.close();
}
- /*
- * Stop the heartbeat thread.
- */
+ // Stop the heartbeat thread.
if (heartbeatThread != null)
{
heartbeatThread.shutdown();
@@ -1028,12 +1015,11 @@
*/
try
{
- if ((writer != null) && (!(Thread.currentThread().equals(writer))))
+ if (writer != null && !Thread.currentThread().equals(writer))
{
-
writer.join(SHUTDOWN_JOIN_TIMEOUT);
}
- if ((reader != null) && (!(Thread.currentThread().equals(reader))))
+ if (reader != null && !Thread.currentThread().equals(reader))
{
reader.join(SHUTDOWN_JOIN_TIMEOUT);
}
@@ -1068,7 +1054,7 @@
{
// loop until not interrupted
}
- } while (((interrupted) || (!acquired)) && (!shutdownWriter));
+ } while ((interrupted || !acquired) && !shutdownWriter);
if (msg != null)
{
incrementOutCount();
@@ -1078,10 +1064,9 @@
if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
incrementAssuredSrSentUpdates();
- } else
+ } else if (!isDataServer())
{
- if (!isDataServer())
- incrementAssuredSdSentUpdates();
+ incrementAssuredSdSentUpdates();
}
}
}
@@ -1094,22 +1079,10 @@
*/
public RSInfo toRSInfo()
{
-
- return new RSInfo(serverId, serverURL, generationId, groupId,
- weight);
+ return new RSInfo(serverId, serverURL, generationId, groupId, weight);
}
/**
- * Starts the monitoring publisher for the domain if not already started.
- */
- protected void createMonitoringPublisher()
- {
- if (!replicationServerDomain.isRunningMonitoringPublisher())
- {
- replicationServerDomain.startMonitoringPublisher();
- }
- }
- /**
* Update the send window size based on the credit specified in the
* given window message.
*
@@ -1132,11 +1105,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
- "\nAND REPLIED:\n" + outStartMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
+ + "\nAND REPLIED:\n" + outStartMsg);
}
}
@@ -1151,12 +1123,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH START HANDSHAKE SENT("+ this +
- "):\n" + outStartMsg.toString()+
- "\nAND RECEIVED:\n" + inStartMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
+ + inStartMsg);
}
}
@@ -1171,11 +1141,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
- "\nAND REPLIED:\n" + outTopoMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
+ + outTopoMsg);
}
}
@@ -1190,11 +1159,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
- "\nAND RECEIVED:\n" + inTopoMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
+ + inTopoMsg);
}
}
@@ -1209,11 +1177,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
- "\nAND REPLIED:\n" + outTopoMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + " :"
+ + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
+ + "\nAND REPLIED:\n" + outTopoMsg);
}
}
@@ -1224,10 +1191,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + " :"
+ + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
}
}
@@ -1240,11 +1206,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED:\n" +
- inStartECLSessionMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + " :"
+ + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
}
}
@@ -1264,10 +1228,9 @@
*/
public long getReferenceGenId()
{
- long refgenid = -1;
- if (replicationServerDomain!=null)
- refgenid = replicationServerDomain.getGenerationId();
- return refgenid;
+ if (replicationServerDomain != null)
+ return replicationServerDomain.getGenerationId();
+ return -1;
}
/**
@@ -1285,8 +1248,7 @@
* @param update the update message received.
* @throws IOException when it occurs.
*/
- public void put(UpdateMsg update)
- throws IOException
+ public void put(UpdateMsg update) throws IOException
{
if (replicationServerDomain!=null)
replicationServerDomain.put(update, this);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 3b8daa7..222b8c7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -29,11 +29,12 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.types.DebugLogLevel;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
/**
* This thread is in charge of periodically determining if the connected
@@ -85,7 +86,6 @@
}
/**
- * Run method for the StatusAnalyzer.
* Analyzes if servers are late or not, and change their status accordingly.
*/
@Override
@@ -93,13 +93,11 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Directory server status analyzer starting for dn " +
- replicationServerDomain.getBaseDn());
+ TRACER.debugInfo(
+ getMessage("Directory server status analyzer starting."));
}
- final int localRsId = replicationServerDomain.getLocalRSServerId();
- boolean interrupted = false;
- while (!shutdown && !interrupted)
+ while (!shutdown)
{
synchronized (shutdownLock)
{
@@ -126,22 +124,21 @@
// for it and change status accordingly if threshold value is
// crossed/uncrossed
for (DataServerHandler serverHandler :
- replicationServerDomain.getConnectedDSs(). values())
+ replicationServerDomain.getConnectedDSs().values())
{
// Get number of pending changes for this server
int nChanges = serverHandler.getRcvMsgQueueSize();
if (debugEnabled())
{
- TRACER.debugInfo("Status analyzer for dn "
- + replicationServerDomain.getBaseDn() + " DS "
+ TRACER.debugInfo(getMessage("Status analyzer: DS "
+ serverHandler.getServerId() + " has " + nChanges
- + " message(s) in writer queue. This is in RS " + localRsId);
+ + " message(s) in writer queue."));
}
// Check status to know if it is relevant to change the status. Do not
// take RSD lock to test. If we attempt to change the status whereas
- // we are in a status that do not allows that, this will be noticed by
- // the changeStatusFromStatusAnalyzer method. This allows to take the
+ // the current status does allow it, this will be noticed by
+ // the changeStatusFromStatusAnalyzer() method. This allows to take the
// lock roughly only when needed versus every sleep time timeout.
if (degradedStatusThreshold > 0)
// Threshold value = 0 means no status analyzer (no degrading system)
@@ -151,39 +148,18 @@
{
if (nChanges >= degradedStatusThreshold)
{
- if (serverHandler.getStatus() == ServerStatus.NORMAL_STATUS)
+ if (serverHandler.getStatus() == NORMAL_STATUS
+ && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
{
- interrupted =
- replicationServerDomain.changeStatusFromStatusAnalyzer(
- serverHandler,
- StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
- if (interrupted)
- {
- // Finish job and let thread die
- TRACER.debugInfo("Status analyzer for dn "
- + replicationServerDomain.getBaseDn()
- + " has been interrupted and will die. This is in RS "
- + localRsId);
- break;
- }
+ break;
}
- } else
+ }
+ else
{
- if (serverHandler.getStatus() == ServerStatus.DEGRADED_STATUS)
+ if (serverHandler.getStatus() == DEGRADED_STATUS
+ && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
{
- interrupted =
- replicationServerDomain.changeStatusFromStatusAnalyzer(
- serverHandler,
- StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
- if (interrupted)
- {
- // Finish job and let thread die
- TRACER.debugInfo("Status analyzer for dn "
- + replicationServerDomain.getBaseDn()
- + " has been interrupted and will die. This is in RS "
- + localRsId);
- break;
- }
+ break;
}
}
}
@@ -191,9 +167,28 @@
}
done = true;
- TRACER.debugInfo("Status analyzer for dn "
- + replicationServerDomain.getBaseDn() + " is terminated."
- + " This is in RS " + localRsId);
+ TRACER.debugInfo(getMessage("Status analyzer is terminated."));
+ }
+
+ private String getMessage(String message)
+ {
+ return "In RS " + replicationServerDomain.getLocalRSServerId()
+ + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
+ + message;
+ }
+
+ private boolean isInterrupted(DataServerHandler serverHandler,
+ StatusMachineEvent event)
+ {
+ if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler,
+ event))
+ {
+ // Finish job and let thread die
+ TRACER.debugInfo(
+ getMessage("Status analyzer has been interrupted and will die."));
+ return true;
+ }
+ return false;
}
/**
@@ -208,9 +203,7 @@
if (debugEnabled())
{
- TRACER.debugInfo("Shutting down status analyzer for dn "
- + replicationServerDomain.getBaseDn()
- + " in RS " + replicationServerDomain.getLocalRSServerId());
+ TRACER.debugInfo(getMessage("Shutting down status analyzer."));
}
}
}
@@ -231,9 +224,7 @@
n++;
if (n >= FACTOR)
{
- TRACER.debugInfo("Interrupting status analyzer for dn " +
- replicationServerDomain.getBaseDn() + " in RS " +
- replicationServerDomain.getLocalRSServerId());
+ TRACER.debugInfo(getMessage("Interrupting status analyzer."));
interrupt();
}
}
@@ -251,9 +242,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Directory server status analyzer for dn " +
- replicationServerDomain.getBaseDn() + " changing threshold value to " +
- degradedStatusThreshold);
+ TRACER.debugInfo(getMessage(
+ "Directory server status analyzer changing threshold value to "
+ + degradedStatusThreshold));
}
this.degradedStatusThreshold = degradedStatusThreshold;
--
Gitblit v1.10.0