From 667d7253a3873ed64dafbffe39d8a84a298c1fdc Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 28 Apr 2014 10:57:22 +0000
Subject: [PATCH] Code cleanup: - Increased MessageHandler encapsulation. - Removed ServerHandler.closeSession() because it is adding unnecessary complexity in understanding calling sites.

---
 opends/src/server/org/opends/server/replication/server/DataServerHandler.java        |    5 -
 opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java |    3 
 opends/src/server/org/opends/server/replication/server/MessageHandler.java           |   46 ++++++++-------
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java        |    9 +-
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java         |    3 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java            |  107 +++++++++--------------------------
 6 files changed, 60 insertions(+), 113 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index d1264a1..01dc266 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -654,7 +654,6 @@
    */
   public void receiveNewStatus(ChangeStatusMsg csMsg)
   {
-    if (replicationServerDomain!=null)
-      replicationServerDomain.processNewStatus(this, csMsg);
+    replicationServerDomain.processNewStatus(this, csMsg);
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index ea6af09..ffea811 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -838,8 +838,7 @@
    */
   private void registerIntoDomain()
   {
-    if (replicationServerDomain != null)
-      replicationServerDomain.registerHandler(this);
+    replicationServerDomain.registerHandler(this);
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index e0a4e76..452a839 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -60,7 +60,7 @@
  * Message are buffered into a queue.
  * Consumers are expected to come and consume the UpdateMsg from the queue.
  */
-public class MessageHandler extends MonitorProvider<MonitorProviderCfg>
+class MessageHandler extends MonitorProvider<MonitorProviderCfg>
 {
 
   /**
@@ -88,11 +88,11 @@
   /**
    * Number of update sent to the server.
    */
-  protected int outCount = 0;
+  private int outCount = 0;
   /**
    * Number of updates received from the server.
    */
-  protected int inCount = 0;
+  private int inCount = 0;
   /**
    * Specifies the max queue size for this handler.
    */
@@ -100,7 +100,7 @@
   /**
    * Specifies the max queue size in bytes for this handler.
    */
-  protected int maxQueueBytesSize = maxQueueSize * 100;
+  private int maxQueueBytesSize = maxQueueSize * 100;
   /**
    * Specifies whether the consumer is following the producer (is not late).
    */
@@ -130,7 +130,7 @@
    *                  in memory by this ServerHandler.
    * @param replicationServer The hosting replication server.
    */
-  public MessageHandler(int queueSize, ReplicationServer replicationServer)
+  MessageHandler(int queueSize, ReplicationServer replicationServer)
   {
     this.maxQueueSize = queueSize;
     this.maxQueueBytesSize = queueSize * 100;
@@ -144,7 +144,7 @@
    * @param update The update that must be added to the list of updates of
    * this handler.
    */
-  public void add(UpdateMsg update)
+  void add(UpdateMsg update)
   {
     synchronized (msgQueue)
     {
@@ -153,7 +153,9 @@
        * waiting for some changes, wake it up
        */
       if (msgQueue.isEmpty())
+      {
         msgQueue.notify();
+      }
 
       msgQueue.add(update);
 
@@ -183,7 +185,7 @@
    * Set the shut down flag to true and returns the previous value of the flag.
    * @return The previous value of the shut down flag
    */
-  public boolean engageShutdown()
+  boolean engageShutdown()
   {
     return shuttingDown.getAndSet(true);
   }
@@ -192,7 +194,7 @@
    * Returns the shutdown flag.
    * @return The shutdown flag value.
    */
-  public boolean shuttingDown()
+  boolean shuttingDown()
   {
     return shuttingDown.get();
   }
@@ -202,9 +204,8 @@
    *
    * @param waitConnections     Waits for the Connections with other RS to
    *                            be established before returning.
-   * @return The replication server domain.
    */
-  public ReplicationServerDomain getDomain(boolean waitConnections)
+  private void setDomain(boolean waitConnections)
   {
     if (replicationServerDomain == null)
     {
@@ -214,14 +215,13 @@
         replicationServer.waitConnections();
       }
     }
-    return replicationServerDomain;
   }
 
   /**
    * Get the count of updates received from the server.
    * @return the count of update received from the server.
    */
-  public int getInCount()
+  int getInCount()
   {
     return inCount;
   }
@@ -375,10 +375,14 @@
             while (msgQueue.isEmpty() && following)
             {
               if (!synchronous)
+              {
                 return null;
+              }
               msgQueue.wait(500);
               if (!activeConsumer)
+              {
                 return null;
+              }
             }
           } catch (InterruptedException e)
           {
@@ -478,7 +482,7 @@
    * Get the count of updates sent to this server.
    * @return  The count of update sent to this server.
    */
-  public int getOutCount()
+  int getOutCount()
   {
     return outCount;
   }
@@ -545,7 +549,7 @@
   /**
    * Increase the counter of updates received from the server.
    */
-  public void incrementInCount()
+  void incrementInCount()
   {
     inCount++;
   }
@@ -553,14 +557,12 @@
   /**
    * Increase the counter of updates sent to the server.
    */
-  public void incrementOutCount()
+  void incrementOutCount()
   {
     outCount++;
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public void initializeMonitorProvider(MonitorProviderCfg configuration)
   throws ConfigException, InitializationException
@@ -619,8 +621,8 @@
     else
     {
       this.baseDN = baseDN;
-      if (!baseDN.toNormalizedString().equals("cn=changelog"))
-        this.replicationServerDomain = getDomain(isDataServer);
+      setDomain(!"cn=changelog".equals(baseDN.toNormalizedString())
+      		&& isDataServer);
     }
   }
 
@@ -645,7 +647,7 @@
    * @param msg the last update sent.
    * @return boolean indicating if the update was meaningful.
    */
-  public boolean updateServerState(UpdateMsg msg)
+  boolean updateServerState(UpdateMsg msg)
   {
     return serverState.update(msg.getCSN());
   }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index a765e8c..53628b0 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -261,7 +261,7 @@
           // We did not recognize the message, close session as what
           // can happen after is undetermined and we do not want the server to
           // be disturbed
-          ServerHandler.closeSession(session, null, null);
+          session.close();
           return;
         }
       }
@@ -275,10 +275,9 @@
         {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
-        if (!shutdown) {
-          Message message =
-            ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
-          logError(message);
+        if (!shutdown)
+        {
+          logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()));
         }
       }
     }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index d68a04a..b019085 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -756,8 +756,7 @@
   public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
   throws DirectoryException, IOException
   {
-    if (replicationServerDomain != null)
-      replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
+    replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
   }
 
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 9c165a8..e35ddbc 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,6 @@
 package org.opends.server.replication.server;
 
 import java.io.IOException;
-
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
@@ -63,35 +62,6 @@
   private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
 
   /**
-   * Close the session and log the provided error message
-   * Log nothing if message is null.
-   * @param providedSession The provided closing session.
-   * @param providedMsg     The provided error message.
-   * @param handler         The handler that manages that session.
-   */
-  static protected void closeSession(Session providedSession,
-      Message providedMsg, ServerHandler handler)
-  {
-    if (providedMsg != null)
-    {
-      if (debugEnabled())
-        TRACER.debugInfo("In " +
-          ((handler != null) ? handler.toString() : "Replication Server") +
-          " closing session with err=" + providedMsg);
-      logError(providedMsg);
-    }
-
-    if (providedSession != null)
-    {
-      // This method is only called when aborting a failing handshake and
-      // not StopMsg should be sent in such situation. StopMsg are only
-      // expected when full handshake has been performed, or at end of
-      // handshake phase 1, when DS was just gathering available RS info
-      providedSession.close();
-    }
-  }
-
-  /**
    * The serverId of the remote server.
    */
   protected int serverId;
@@ -243,7 +213,20 @@
     Session localSession = session;
     if (localSession != null)
     {
-      closeSession(localSession, reason, this);
+      if (reason != null)
+      {
+        if (debugEnabled())
+        {
+         TRACER.debugInfo("In " + this + " closing session with err=" + reason);
+        }
+        logError(reason);
+      }
+
+      // This method is only called when aborting a failing handshake and
+      // not StopMsg should be sent in such situation. StopMsg are only
+      // expected when full handshake has been performed, or at end of
+      // handshake phase 1, when DS was just gathering available RS info
+      localSession.close();
     }
 
     releaseDomainLock();
@@ -252,7 +235,7 @@
     // We may have changed it as it was -1 and we received a value >0 from peer
     // server and the last topo message sent may have failed being sent: in that
     // case retrieve old value of generation id for replication server domain
-    if (oldGenerationId != -100 && replicationServerDomain != null)
+    if (oldGenerationId != -100)
     {
       replicationServerDomain.changeGenerationId(oldGenerationId);
     }
@@ -263,7 +246,7 @@
    */
   protected void releaseDomainLock()
   {
-    if (replicationServerDomain != null && replicationServerDomain.hasLock())
+    if (replicationServerDomain.hasLock())
     {
       replicationServerDomain.release();
     }
@@ -333,8 +316,7 @@
       {
         final Message message =
             ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
-        throw new DirectoryException(ResultCode.OTHER,
-            message, e);
+        throw new DirectoryException(ResultCode.OTHER, message, e);
       }
       reader.start();
       writer.start();
@@ -366,7 +348,7 @@
   public void send(ReplicationMsg msg) throws IOException
   {
     // avoid logging anything for unit tests that include a null domain.
-    if (debugEnabled() && replicationServerDomain != null)
+    if (debugEnabled())
     {
       TRACER.debugInfo("In "
           + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
@@ -515,16 +497,6 @@
     return heartbeatInterval;
   }
 
-  /**
-   * Get the count of updates received from the server.
-   * @return the count of update received from the server.
-   */
-  @Override
-  public int getInCount()
-  {
-    return inCount;
-  }
-
   /** {@inheritDoc} */
   @Override
   public List<Attribute> getMonitorData()
@@ -597,16 +569,6 @@
   public abstract String getMonitorInstanceName();
 
   /**
-   * Get the count of updates sent to this server.
-   * @return  The count of update sent to this server.
-   */
-  @Override
-  public int getOutCount()
-  {
-    return outCount;
-  }
-
-  /**
    * Gets the protocol version used with this remote server.
    * @return The protocol version used with this remote server.
    */
@@ -714,9 +676,7 @@
     assuredSrSentUpdatesTimeout.incrementAndGet();
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public void initializeMonitorProvider(MonitorProviderCfg configuration)
   throws ConfigException, InitializationException
@@ -822,16 +782,11 @@
   public void lockDomainWithTimeout() throws DirectoryException,
       InterruptedException
   {
-    if (replicationServerDomain == null)
-    {
-      return;
-    }
-
-    Random random = new Random();
-    int randomTime = random.nextInt(6); // Random from 0 to 5
+    final Random random = new Random();
+    final int randomTime = random.nextInt(6); // Random from 0 to 5
     // Wait at least 3 seconds + (0 to 5 seconds)
-    long timeout = 3000 + (randomTime * 1000);
-    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
+    final long timeout = 3000 + (randomTime * 1000);
+    final boolean lockAcquired = replicationServerDomain.tryLock(timeout);
     if (!lockAcquired)
     {
       Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
@@ -1197,8 +1152,7 @@
    */
   void processAck(AckMsg ack)
   {
-    if (replicationServerDomain!=null)
-      replicationServerDomain.processAck(ack, this);
+    replicationServerDomain.processAck(ack, this);
   }
 
   /**
@@ -1207,9 +1161,7 @@
    */
   public long getReferenceGenId()
   {
-    if (replicationServerDomain != null)
-      return replicationServerDomain.getGenerationId();
-    return -1;
+    return replicationServerDomain.getGenerationId();
   }
 
   /**
@@ -1218,8 +1170,7 @@
    */
   void processResetGenId(ResetGenerationIdMsg msg)
   {
-    if (replicationServerDomain!=null)
-      replicationServerDomain.resetGenerationId(this, msg);
+    replicationServerDomain.resetGenerationId(this, msg);
   }
 
   /**
@@ -1230,8 +1181,7 @@
   public void put(UpdateMsg update) throws IOException
   {
     decAndCheckWindow();
-    if (replicationServerDomain!=null)
-      replicationServerDomain.put(update, this);
+    replicationServerDomain.put(update, this);
   }
 
   /**
@@ -1239,8 +1189,7 @@
    */
   public void doStop()
   {
-    if (replicationServerDomain!=null)
-      replicationServerDomain.stopServer(this, false);
+    replicationServerDomain.stopServer(this, false);
   }
 
   /**

--
Gitblit v1.10.0