From 99014f56acd4f0944e36e6f8e8bd1e6b0d256c3f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 28 Apr 2014 10:57:22 +0000
Subject: [PATCH] Code cleanup: - Increased MessageHandler encapsulation. - Removed ServerHandler.closeSession() because it is adding unnecessary complexity in understanding calling sites.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 5 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 9 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 46 ++++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 107 +++++++++--------------------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 3
6 files changed, 60 insertions(+), 113 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index d1264a1..01dc266 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -654,7 +654,6 @@
*/
public void receiveNewStatus(ChangeStatusMsg csMsg)
{
- if (replicationServerDomain!=null)
- replicationServerDomain.processNewStatus(this, csMsg);
+ replicationServerDomain.processNewStatus(this, csMsg);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index ea6af09..ffea811 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -838,8 +838,7 @@
*/
private void registerIntoDomain()
{
- if (replicationServerDomain != null)
- replicationServerDomain.registerHandler(this);
+ replicationServerDomain.registerHandler(this);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index e0a4e76..452a839 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -60,7 +60,7 @@
* Message are buffered into a queue.
* Consumers are expected to come and consume the UpdateMsg from the queue.
*/
-public class MessageHandler extends MonitorProvider<MonitorProviderCfg>
+class MessageHandler extends MonitorProvider<MonitorProviderCfg>
{
/**
@@ -88,11 +88,11 @@
/**
* Number of update sent to the server.
*/
- protected int outCount = 0;
+ private int outCount = 0;
/**
* Number of updates received from the server.
*/
- protected int inCount = 0;
+ private int inCount = 0;
/**
* Specifies the max queue size for this handler.
*/
@@ -100,7 +100,7 @@
/**
* Specifies the max queue size in bytes for this handler.
*/
- protected int maxQueueBytesSize = maxQueueSize * 100;
+ private int maxQueueBytesSize = maxQueueSize * 100;
/**
* Specifies whether the consumer is following the producer (is not late).
*/
@@ -130,7 +130,7 @@
* in memory by this ServerHandler.
* @param replicationServer The hosting replication server.
*/
- public MessageHandler(int queueSize, ReplicationServer replicationServer)
+ MessageHandler(int queueSize, ReplicationServer replicationServer)
{
this.maxQueueSize = queueSize;
this.maxQueueBytesSize = queueSize * 100;
@@ -144,7 +144,7 @@
* @param update The update that must be added to the list of updates of
* this handler.
*/
- public void add(UpdateMsg update)
+ void add(UpdateMsg update)
{
synchronized (msgQueue)
{
@@ -153,7 +153,9 @@
* waiting for some changes, wake it up
*/
if (msgQueue.isEmpty())
+ {
msgQueue.notify();
+ }
msgQueue.add(update);
@@ -183,7 +185,7 @@
* Set the shut down flag to true and returns the previous value of the flag.
* @return The previous value of the shut down flag
*/
- public boolean engageShutdown()
+ boolean engageShutdown()
{
return shuttingDown.getAndSet(true);
}
@@ -192,7 +194,7 @@
* Returns the shutdown flag.
* @return The shutdown flag value.
*/
- public boolean shuttingDown()
+ boolean shuttingDown()
{
return shuttingDown.get();
}
@@ -202,9 +204,8 @@
*
* @param waitConnections Waits for the Connections with other RS to
* be established before returning.
- * @return The replication server domain.
*/
- public ReplicationServerDomain getDomain(boolean waitConnections)
+ private void setDomain(boolean waitConnections)
{
if (replicationServerDomain == null)
{
@@ -214,14 +215,13 @@
replicationServer.waitConnections();
}
}
- return replicationServerDomain;
}
/**
* Get the count of updates received from the server.
* @return the count of update received from the server.
*/
- public int getInCount()
+ int getInCount()
{
return inCount;
}
@@ -375,10 +375,14 @@
while (msgQueue.isEmpty() && following)
{
if (!synchronous)
+ {
return null;
+ }
msgQueue.wait(500);
if (!activeConsumer)
+ {
return null;
+ }
}
} catch (InterruptedException e)
{
@@ -478,7 +482,7 @@
* Get the count of updates sent to this server.
* @return The count of update sent to this server.
*/
- public int getOutCount()
+ int getOutCount()
{
return outCount;
}
@@ -545,7 +549,7 @@
/**
* Increase the counter of updates received from the server.
*/
- public void incrementInCount()
+ void incrementInCount()
{
inCount++;
}
@@ -553,14 +557,12 @@
/**
* Increase the counter of updates sent to the server.
*/
- public void incrementOutCount()
+ void incrementOutCount()
{
outCount++;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void initializeMonitorProvider(MonitorProviderCfg configuration)
throws ConfigException, InitializationException
@@ -619,8 +621,8 @@
else
{
this.baseDN = baseDN;
- if (!baseDN.toNormalizedString().equals("cn=changelog"))
- this.replicationServerDomain = getDomain(isDataServer);
+ setDomain(!"cn=changelog".equals(baseDN.toNormalizedString())
+ && isDataServer);
}
}
@@ -645,7 +647,7 @@
* @param msg the last update sent.
* @return boolean indicating if the update was meaningful.
*/
- public boolean updateServerState(UpdateMsg msg)
+ boolean updateServerState(UpdateMsg msg)
{
return serverState.update(msg.getCSN());
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index a765e8c..53628b0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -261,7 +261,7 @@
// We did not recognize the message, close session as what
// can happen after is undetermined and we do not want the server to
// be disturbed
- ServerHandler.closeSession(session, null, null);
+ session.close();
return;
}
}
@@ -275,10 +275,9 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- if (!shutdown) {
- Message message =
- ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
- logError(message);
+ if (!shutdown)
+ {
+ logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()));
}
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index d68a04a..b019085 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -756,8 +756,7 @@
public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
throws DirectoryException, IOException
{
- if (replicationServerDomain != null)
- replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
+ replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 9c165a8..e35ddbc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,6 @@
package org.opends.server.replication.server;
import java.io.IOException;
-
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
@@ -63,35 +62,6 @@
private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
/**
- * Close the session and log the provided error message
- * Log nothing if message is null.
- * @param providedSession The provided closing session.
- * @param providedMsg The provided error message.
- * @param handler The handler that manages that session.
- */
- static protected void closeSession(Session providedSession,
- Message providedMsg, ServerHandler handler)
- {
- if (providedMsg != null)
- {
- if (debugEnabled())
- TRACER.debugInfo("In " +
- ((handler != null) ? handler.toString() : "Replication Server") +
- " closing session with err=" + providedMsg);
- logError(providedMsg);
- }
-
- if (providedSession != null)
- {
- // This method is only called when aborting a failing handshake and
- // not StopMsg should be sent in such situation. StopMsg are only
- // expected when full handshake has been performed, or at end of
- // handshake phase 1, when DS was just gathering available RS info
- providedSession.close();
- }
- }
-
- /**
* The serverId of the remote server.
*/
protected int serverId;
@@ -243,7 +213,20 @@
Session localSession = session;
if (localSession != null)
{
- closeSession(localSession, reason, this);
+ if (reason != null)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " + this + " closing session with err=" + reason);
+ }
+ logError(reason);
+ }
+
+ // This method is only called when aborting a failing handshake and
+ // not StopMsg should be sent in such situation. StopMsg are only
+ // expected when full handshake has been performed, or at end of
+ // handshake phase 1, when DS was just gathering available RS info
+ localSession.close();
}
releaseDomainLock();
@@ -252,7 +235,7 @@
// We may have changed it as it was -1 and we received a value >0 from peer
// server and the last topo message sent may have failed being sent: in that
// case retrieve old value of generation id for replication server domain
- if (oldGenerationId != -100 && replicationServerDomain != null)
+ if (oldGenerationId != -100)
{
replicationServerDomain.changeGenerationId(oldGenerationId);
}
@@ -263,7 +246,7 @@
*/
protected void releaseDomainLock()
{
- if (replicationServerDomain != null && replicationServerDomain.hasLock())
+ if (replicationServerDomain.hasLock())
{
replicationServerDomain.release();
}
@@ -333,8 +316,7 @@
{
final Message message =
ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
- throw new DirectoryException(ResultCode.OTHER,
- message, e);
+ throw new DirectoryException(ResultCode.OTHER, message, e);
}
reader.start();
writer.start();
@@ -366,7 +348,7 @@
public void send(ReplicationMsg msg) throws IOException
{
// avoid logging anything for unit tests that include a null domain.
- if (debugEnabled() && replicationServerDomain != null)
+ if (debugEnabled())
{
TRACER.debugInfo("In "
+ replicationServerDomain.getLocalRSMonitorInstanceName() + " "
@@ -515,16 +497,6 @@
return heartbeatInterval;
}
- /**
- * Get the count of updates received from the server.
- * @return the count of update received from the server.
- */
- @Override
- public int getInCount()
- {
- return inCount;
- }
-
/** {@inheritDoc} */
@Override
public List<Attribute> getMonitorData()
@@ -597,16 +569,6 @@
public abstract String getMonitorInstanceName();
/**
- * Get the count of updates sent to this server.
- * @return The count of update sent to this server.
- */
- @Override
- public int getOutCount()
- {
- return outCount;
- }
-
- /**
* Gets the protocol version used with this remote server.
* @return The protocol version used with this remote server.
*/
@@ -714,9 +676,7 @@
assuredSrSentUpdatesTimeout.incrementAndGet();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void initializeMonitorProvider(MonitorProviderCfg configuration)
throws ConfigException, InitializationException
@@ -822,16 +782,11 @@
public void lockDomainWithTimeout() throws DirectoryException,
InterruptedException
{
- if (replicationServerDomain == null)
- {
- return;
- }
-
- Random random = new Random();
- int randomTime = random.nextInt(6); // Random from 0 to 5
+ final Random random = new Random();
+ final int randomTime = random.nextInt(6); // Random from 0 to 5
// Wait at least 3 seconds + (0 to 5 seconds)
- long timeout = 3000 + (randomTime * 1000);
- boolean lockAcquired = replicationServerDomain.tryLock(timeout);
+ final long timeout = 3000 + (randomTime * 1000);
+ final boolean lockAcquired = replicationServerDomain.tryLock(timeout);
if (!lockAcquired)
{
Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
@@ -1197,8 +1152,7 @@
*/
void processAck(AckMsg ack)
{
- if (replicationServerDomain!=null)
- replicationServerDomain.processAck(ack, this);
+ replicationServerDomain.processAck(ack, this);
}
/**
@@ -1207,9 +1161,7 @@
*/
public long getReferenceGenId()
{
- if (replicationServerDomain != null)
- return replicationServerDomain.getGenerationId();
- return -1;
+ return replicationServerDomain.getGenerationId();
}
/**
@@ -1218,8 +1170,7 @@
*/
void processResetGenId(ResetGenerationIdMsg msg)
{
- if (replicationServerDomain!=null)
- replicationServerDomain.resetGenerationId(this, msg);
+ replicationServerDomain.resetGenerationId(this, msg);
}
/**
@@ -1230,8 +1181,7 @@
public void put(UpdateMsg update) throws IOException
{
decAndCheckWindow();
- if (replicationServerDomain!=null)
- replicationServerDomain.put(update, this);
+ replicationServerDomain.put(update, this);
}
/**
@@ -1239,8 +1189,7 @@
*/
public void doStop()
{
- if (replicationServerDomain!=null)
- replicationServerDomain.stopServer(this, false);
+ replicationServerDomain.stopServer(this, false);
}
/**
--
Gitblit v1.10.0