From c63e1f305327734be21f5ce0e21bdd2f7a4d143b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 10:32:47 +0000
Subject: [PATCH] Enforced ReplicationServerDomain responsibilities by increasing encapsulation.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 234 ++++++++++++++++++++++++----------------------------------
1 files changed, 98 insertions(+), 136 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c377e5f..339ce80 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@
if (debugEnabled())
TRACER.debugInfo("In " +
((handler != null) ? handler.toString() : "Replication Server") +
- " closing session with err=" +
- providedMsg.toString());
+ " closing session with err=" + providedMsg);
logError(providedMsg);
}
@@ -125,7 +124,7 @@
*/
protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
/**
- // Number of updates received from the server in assured safe data mode.
+ * Number of updates received from the server in assured safe data mode.
*/
protected int assuredSdReceivedUpdates = 0;
/**
@@ -169,7 +168,7 @@
/**
* The initial size of the sending window.
*/
- int sendWindowSize;
+ protected int sendWindowSize;
/**
* remote generation id.
*/
@@ -185,7 +184,7 @@
/**
* Group id of this remote server.
*/
- protected byte groupId = (byte) -1;
+ protected byte groupId = -1;
/**
* The SSL encryption after the negotiation with the peer.
*/
@@ -254,8 +253,7 @@
closeSession(localSession, reason, this);
}
- if ((replicationServerDomain != null) &&
- replicationServerDomain.hasLock())
+ if (replicationServerDomain != null && replicationServerDomain.hasLock())
replicationServerDomain.release();
// If generation id of domain was changed, set it back to old value
@@ -263,10 +261,9 @@
// peer server and the last topo message sent may have failed being
// sent: in that case retrieve old value of generation id for
// replication server domain
- if (oldGenerationId != -100)
+ if (oldGenerationId != -100 && replicationServerDomain != null)
{
- if (replicationServerDomain!=null)
- replicationServerDomain.changeGenerationId(oldGenerationId, false);
+ replicationServerDomain.changeGenerationId(oldGenerationId, false);
}
}
@@ -304,7 +301,6 @@
@Override
public boolean engageShutdown()
{
- // Use thread safe boolean
return shuttingDown.getAndSet(true);
}
@@ -340,13 +336,11 @@
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
- writer = new ServerWriter(session, this,
- replicationServerDomain);
+ writer = new ServerWriter(session, this, replicationServerDomain);
reader = new ServerReader(session, this);
- session.setName("Replication server RS("
- + this.getReplicationServerId()
- + ") session thread to " + this.toString() + " at "
+ session.setName("Replication server RS(" + getReplicationServerId()
+ + ") session thread to " + this + " at "
+ session.getReadableRemoteAddress());
session.start();
try
@@ -366,9 +360,8 @@
// Create a thread to send heartbeat messages.
if (heartbeatInterval > 0)
{
- String threadName = "Replication server RS("
- + this.getReplicationServerId()
- + ") heartbeat publisher to " + this.toString() + " at "
+ String threadName = "Replication server RS(" + getReplicationServerId()
+ + ") heartbeat publisher to " + this + " at "
+ session.getReadableRemoteAddress();
heartbeatThread = new HeartbeatThread(threadName, session,
heartbeatInterval / 3);
@@ -788,7 +781,7 @@
*/
public boolean isReplicationServer()
{
- return (!this.isDataServer());
+ return !this.isDataServer();
}
@@ -827,62 +820,58 @@
// it will be created and locked later in the method
if (!timedout)
{
- // !timedout
if (!replicationServerDomain.hasLock())
replicationServerDomain.lock();
+ return;
}
- else
+
+ /**
+ * Take the lock on the domain.
+ * WARNING: Here we try to acquire the lock with a timeout. This
+ * is for preventing a deadlock that may happen if there are cross
+ * connection attempts (for same domain) from this replication
+ * server and from a peer one:
+ * Here is the scenario:
+ * - RS1 connect thread takes the domain lock and starts
+ * connection to RS2
+ * - at the same time RS2 connect thread takes his domain lock and
+ * start connection to RS2
+ * - RS2 listen thread starts processing received
+ * ReplServerStartMsg from RS1 and wants to acquire the lock on
+ * the domain (here) but cannot as RS2 connect thread already has
+ * it
+ * - RS1 listen thread starts processing received
+ * ReplServerStartMsg from RS2 and wants to acquire the lock on
+ * the domain (here) but cannot as RS1 connect thread already has
+ * it
+ * => Deadlock: 4 threads are locked.
+ * So to prevent that in such situation, the listen threads here
+ * will both timeout trying to acquire the lock. The random time
+ * for the timeout should allow on connection attempt to be
+ * aborted whereas the other one should have time to finish in the
+ * same time.
+ * Warning: the minimum time (3s) should be big enough to allow
+ * normal situation connections to terminate. The added random
+ * time should represent a big enough range so that the chance to
+ * have one listen thread timing out a lot before the peer one is
+ * great. When the first listen thread times out, the remote
+ * connect thread should release the lock and allow the peer
+ * listen thread to take the lock it was waiting for and process
+ * the connection attempt.
+ */
+ Random random = new Random();
+ int randomTime = random.nextInt(6); // Random from 0 to 5
+ // Wait at least 3 seconds + (0 to 5 seconds)
+ long timeout = 3000 + (randomTime * 1000);
+ boolean lockAcquired = replicationServerDomain.tryLock(timeout);
+ if (!lockAcquired)
{
- // timedout
- /**
- * Take the lock on the domain.
- * WARNING: Here we try to acquire the lock with a timeout. This
- * is for preventing a deadlock that may happen if there are cross
- * connection attempts (for same domain) from this replication
- * server and from a peer one:
- * Here is the scenario:
- * - RS1 connect thread takes the domain lock and starts
- * connection to RS2
- * - at the same time RS2 connect thread takes his domain lock and
- * start connection to RS2
- * - RS2 listen thread starts processing received
- * ReplServerStartMsg from RS1 and wants to acquire the lock on
- * the domain (here) but cannot as RS2 connect thread already has
- * it
- * - RS1 listen thread starts processing received
- * ReplServerStartMsg from RS2 and wants to acquire the lock on
- * the domain (here) but cannot as RS1 connect thread already has
- * it
- * => Deadlock: 4 threads are locked.
- * So to prevent that in such situation, the listen threads here
- * will both timeout trying to acquire the lock. The random time
- * for the timeout should allow on connection attempt to be
- * aborted whereas the other one should have time to finish in the
- * same time.
- * Warning: the minimum time (3s) should be big enough to allow
- * normal situation connections to terminate. The added random
- * time should represent a big enough range so that the chance to
- * have one listen thread timing out a lot before the peer one is
- * great. When the first listen thread times out, the remote
- * connect thread should release the lock and allow the peer
- * listen thread to take the lock it was waiting for and process
- * the connection attempt.
- */
- Random random = new Random();
- int randomTime = random.nextInt(6); // Random from 0 to 5
- // Wait at least 3 seconds + (0 to 5 seconds)
- long timeout = 3000 + (randomTime * 1000);
- boolean noTimeout = replicationServerDomain.tryLock(timeout);
- if (!noTimeout)
- {
- // Timeout
- Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
- getBaseDN(),
- serverId,
- session.getReadableRemoteAddress(),
- getReplicationServerId());
- throw new DirectoryException(ResultCode.OTHER, message);
- }
+ Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+ getBaseDN(),
+ serverId,
+ session.getReadableRemoteAddress(),
+ getReplicationServerId());
+ throw new DirectoryException(ResultCode.OTHER, message);
}
}
@@ -1011,9 +1000,7 @@
session.close();
}
- /*
- * Stop the heartbeat thread.
- */
+ // Stop the heartbeat thread.
if (heartbeatThread != null)
{
heartbeatThread.shutdown();
@@ -1028,12 +1015,11 @@
*/
try
{
- if ((writer != null) && (!(Thread.currentThread().equals(writer))))
+ if (writer != null && !Thread.currentThread().equals(writer))
{
-
writer.join(SHUTDOWN_JOIN_TIMEOUT);
}
- if ((reader != null) && (!(Thread.currentThread().equals(reader))))
+ if (reader != null && !Thread.currentThread().equals(reader))
{
reader.join(SHUTDOWN_JOIN_TIMEOUT);
}
@@ -1068,7 +1054,7 @@
{
// loop until not interrupted
}
- } while (((interrupted) || (!acquired)) && (!shutdownWriter));
+ } while ((interrupted || !acquired) && !shutdownWriter);
if (msg != null)
{
incrementOutCount();
@@ -1078,10 +1064,9 @@
if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
incrementAssuredSrSentUpdates();
- } else
+ } else if (!isDataServer())
{
- if (!isDataServer())
- incrementAssuredSdSentUpdates();
+ incrementAssuredSdSentUpdates();
}
}
}
@@ -1094,22 +1079,10 @@
*/
public RSInfo toRSInfo()
{
-
- return new RSInfo(serverId, serverURL, generationId, groupId,
- weight);
+ return new RSInfo(serverId, serverURL, generationId, groupId, weight);
}
/**
- * Starts the monitoring publisher for the domain if not already started.
- */
- protected void createMonitoringPublisher()
- {
- if (!replicationServerDomain.isRunningMonitoringPublisher())
- {
- replicationServerDomain.startMonitoringPublisher();
- }
- }
- /**
* Update the send window size based on the credit specified in the
* given window message.
*
@@ -1132,11 +1105,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
- "\nAND REPLIED:\n" + outStartMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
+ + "\nAND REPLIED:\n" + outStartMsg);
}
}
@@ -1151,12 +1123,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH START HANDSHAKE SENT("+ this +
- "):\n" + outStartMsg.toString()+
- "\nAND RECEIVED:\n" + inStartMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
+ + inStartMsg);
}
}
@@ -1171,11 +1141,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
- "\nAND REPLIED:\n" + outTopoMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
+ + outTopoMsg);
}
}
@@ -1190,11 +1159,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + ":" +
- "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
- "\nAND RECEIVED:\n" + inTopoMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + ":"
+ + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
+ + inTopoMsg);
}
}
@@ -1209,11 +1177,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
- "\nAND REPLIED:\n" + outTopoMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + " :"
+ + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
+ + "\nAND REPLIED:\n" + outTopoMsg);
}
}
@@ -1224,10 +1191,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + " :"
+ + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
}
}
@@ -1240,11 +1206,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- this.replicationServer.getMonitorInstanceName() + ", " +
- this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED:\n" +
- inStartECLSessionMsg.toString());
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + ", " + getClass().getSimpleName() + " " + this + " :"
+ + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
}
}
@@ -1264,10 +1228,9 @@
*/
public long getReferenceGenId()
{
- long refgenid = -1;
- if (replicationServerDomain!=null)
- refgenid = replicationServerDomain.getGenerationId();
- return refgenid;
+ if (replicationServerDomain != null)
+ return replicationServerDomain.getGenerationId();
+ return -1;
}
/**
@@ -1285,8 +1248,7 @@
* @param update the update message received.
* @throws IOException when it occurs.
*/
- public void put(UpdateMsg update)
- throws IOException
+ public void put(UpdateMsg update) throws IOException
{
if (replicationServerDomain!=null)
replicationServerDomain.put(update, this);
--
Gitblit v1.10.0