From c63e1f305327734be21f5ce0e21bdd2f7a4d143b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 10:32:47 +0000
Subject: [PATCH] Enforced ReplicationServerDomain responsibilities by increasing encapsulation.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java |  234 ++++++++++++++++++++++++----------------------------------
 1 files changed, 98 insertions(+), 136 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c377e5f..339ce80 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@
       if (debugEnabled())
         TRACER.debugInfo("In " +
           ((handler != null) ? handler.toString() : "Replication Server") +
-          " closing session with err=" +
-          providedMsg.toString());
+          " closing session with err=" + providedMsg);
       logError(providedMsg);
     }
 
@@ -125,7 +124,7 @@
    */
   protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
   /**
-  // Number of updates received from the server in assured safe data mode.
+   * Number of updates received from the server in assured safe data mode.
    */
   protected int assuredSdReceivedUpdates = 0;
   /**
@@ -169,7 +168,7 @@
   /**
    * The initial size of the sending window.
    */
-  int sendWindowSize;
+  protected int sendWindowSize;
   /**
    * remote generation id.
    */
@@ -185,7 +184,7 @@
   /**
    * Group id of this remote server.
    */
-  protected byte groupId = (byte) -1;
+  protected byte groupId = -1;
   /**
    * The SSL encryption after the negotiation with the peer.
    */
@@ -254,8 +253,7 @@
       closeSession(localSession, reason, this);
     }
 
-    if ((replicationServerDomain != null) &&
-        replicationServerDomain.hasLock())
+    if (replicationServerDomain != null && replicationServerDomain.hasLock())
       replicationServerDomain.release();
 
     // If generation id of domain was changed, set it back to old value
@@ -263,10 +261,9 @@
     // peer server and the last topo message sent may have failed being
     // sent: in that case retrieve old value of generation id for
     // replication server domain
-    if (oldGenerationId != -100)
+    if (oldGenerationId != -100 && replicationServerDomain != null)
     {
-      if (replicationServerDomain!=null)
-        replicationServerDomain.changeGenerationId(oldGenerationId, false);
+      replicationServerDomain.changeGenerationId(oldGenerationId, false);
     }
   }
 
@@ -304,7 +301,6 @@
   @Override
   public boolean engageShutdown()
   {
-    // Use thread safe boolean
     return shuttingDown.getAndSet(true);
   }
 
@@ -340,13 +336,11 @@
       // sendWindow MUST be created before starting the writer
       sendWindow = new Semaphore(sendWindowSize);
 
-      writer = new ServerWriter(session, this,
-          replicationServerDomain);
+      writer = new ServerWriter(session, this, replicationServerDomain);
       reader = new ServerReader(session, this);
 
-      session.setName("Replication server RS("
-          + this.getReplicationServerId()
-          + ") session thread to " + this.toString() + " at "
+      session.setName("Replication server RS(" + getReplicationServerId()
+          + ") session thread to " + this + " at "
           + session.getReadableRemoteAddress());
       session.start();
       try
@@ -366,9 +360,8 @@
       // Create a thread to send heartbeat messages.
       if (heartbeatInterval > 0)
       {
-        String threadName = "Replication server RS("
-            + this.getReplicationServerId()
-            + ") heartbeat publisher to " + this.toString() + " at "
+        String threadName = "Replication server RS(" + getReplicationServerId()
+            + ") heartbeat publisher to " + this + " at "
             + session.getReadableRemoteAddress();
         heartbeatThread = new HeartbeatThread(threadName, session,
             heartbeatInterval / 3);
@@ -788,7 +781,7 @@
    */
   public boolean isReplicationServer()
   {
-    return (!this.isDataServer());
+    return !this.isDataServer();
   }
 
 
@@ -827,62 +820,58 @@
     // it will be created and locked later in the method
     if (!timedout)
     {
-      // !timedout
       if (!replicationServerDomain.hasLock())
         replicationServerDomain.lock();
+      return;
     }
-    else
+
+    /**
+     * Take the lock on the domain.
+     * WARNING: Here we try to acquire the lock with a timeout. This
+     * is for preventing a deadlock that may happen if there are cross
+     * connection attempts (for same domain) from this replication
+     * server and from a peer one:
+     * Here is the scenario:
+     * - RS1 connect thread takes the domain lock and starts
+     * connection to RS2
+     * - at the same time RS2 connect thread takes his domain lock and
+     * start connection to RS2
+     * - RS2 listen thread starts processing received
+     * ReplServerStartMsg from RS1 and wants to acquire the lock on
+     * the domain (here) but cannot as RS2 connect thread already has
+     * it
+     * - RS1 listen thread starts processing received
+     * ReplServerStartMsg from RS2 and wants to acquire the lock on
+     * the domain (here) but cannot as RS1 connect thread already has
+     * it
+     * => Deadlock: 4 threads are locked.
+     * So to prevent that in such situation, the listen threads here
+     * will both timeout trying to acquire the lock. The random time
+     * for the timeout should allow on connection attempt to be
+     * aborted whereas the other one should have time to finish in the
+     * same time.
+     * Warning: the minimum time (3s) should be big enough to allow
+     * normal situation connections to terminate. The added random
+     * time should represent a big enough range so that the chance to
+     * have one listen thread timing out a lot before the peer one is
+     * great. When the first listen thread times out, the remote
+     * connect thread should release the lock and allow the peer
+     * listen thread to take the lock it was waiting for and process
+     * the connection attempt.
+     */
+    Random random = new Random();
+    int randomTime = random.nextInt(6); // Random from 0 to 5
+    // Wait at least 3 seconds + (0 to 5 seconds)
+    long timeout = 3000 + (randomTime * 1000);
+    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
+    if (!lockAcquired)
     {
-      // timedout
-      /**
-       * Take the lock on the domain.
-       * WARNING: Here we try to acquire the lock with a timeout. This
-       * is for preventing a deadlock that may happen if there are cross
-       * connection attempts (for same domain) from this replication
-       * server and from a peer one:
-       * Here is the scenario:
-       * - RS1 connect thread takes the domain lock and starts
-       * connection to RS2
-       * - at the same time RS2 connect thread takes his domain lock and
-       * start connection to RS2
-       * - RS2 listen thread starts processing received
-       * ReplServerStartMsg from RS1 and wants to acquire the lock on
-       * the domain (here) but cannot as RS2 connect thread already has
-       * it
-       * - RS1 listen thread starts processing received
-       * ReplServerStartMsg from RS2 and wants to acquire the lock on
-       * the domain (here) but cannot as RS1 connect thread already has
-       * it
-       * => Deadlock: 4 threads are locked.
-       * So to prevent that in such situation, the listen threads here
-       * will both timeout trying to acquire the lock. The random time
-       * for the timeout should allow on connection attempt to be
-       * aborted whereas the other one should have time to finish in the
-       * same time.
-       * Warning: the minimum time (3s) should be big enough to allow
-       * normal situation connections to terminate. The added random
-       * time should represent a big enough range so that the chance to
-       * have one listen thread timing out a lot before the peer one is
-       * great. When the first listen thread times out, the remote
-       * connect thread should release the lock and allow the peer
-       * listen thread to take the lock it was waiting for and process
-       * the connection attempt.
-       */
-      Random random = new Random();
-      int randomTime = random.nextInt(6); // Random from 0 to 5
-      // Wait at least 3 seconds + (0 to 5 seconds)
-      long timeout = 3000 + (randomTime * 1000);
-      boolean noTimeout = replicationServerDomain.tryLock(timeout);
-      if (!noTimeout)
-      {
-        // Timeout
-        Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
-            getBaseDN(),
-            serverId,
-            session.getReadableRemoteAddress(),
-            getReplicationServerId());
-        throw new DirectoryException(ResultCode.OTHER, message);
-      }
+      Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+          getBaseDN(),
+          serverId,
+          session.getReadableRemoteAddress(),
+          getReplicationServerId());
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
   }
 
@@ -1011,9 +1000,7 @@
       session.close();
     }
 
-    /*
-     * Stop the heartbeat thread.
-     */
+    // Stop the heartbeat thread.
     if (heartbeatThread != null)
     {
       heartbeatThread.shutdown();
@@ -1028,12 +1015,11 @@
      */
     try
     {
-      if ((writer != null) && (!(Thread.currentThread().equals(writer))))
+      if (writer != null && !Thread.currentThread().equals(writer))
       {
-
         writer.join(SHUTDOWN_JOIN_TIMEOUT);
       }
-      if ((reader != null) && (!(Thread.currentThread().equals(reader))))
+      if (reader != null && !Thread.currentThread().equals(reader))
       {
         reader.join(SHUTDOWN_JOIN_TIMEOUT);
       }
@@ -1068,7 +1054,7 @@
       {
         // loop until not interrupted
       }
-    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
+    } while ((interrupted || !acquired) && !shutdownWriter);
     if (msg != null)
     {
       incrementOutCount();
@@ -1078,10 +1064,9 @@
         if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
         {
           incrementAssuredSrSentUpdates();
-        } else
+        } else if (!isDataServer())
         {
-          if (!isDataServer())
-            incrementAssuredSdSentUpdates();
+          incrementAssuredSdSentUpdates();
         }
       }
     }
@@ -1094,22 +1079,10 @@
    */
   public RSInfo toRSInfo()
   {
-
-    return new RSInfo(serverId, serverURL, generationId, groupId,
-      weight);
+    return new RSInfo(serverId, serverURL, generationId, groupId, weight);
   }
 
   /**
-   * Starts the monitoring publisher for the domain if not already started.
-   */
-  protected void createMonitoringPublisher()
-  {
-    if (!replicationServerDomain.isRunningMonitoringPublisher())
-    {
-      replicationServerDomain.startMonitoringPublisher();
-    }
-  }
-  /**
    * Update the send window size based on the credit specified in the
    * given window message.
    *
@@ -1132,11 +1105,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-        this.replicationServer.getMonitorInstanceName() + ", " +
-        this.getClass().getSimpleName() + " " + this + ":" +
-        "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
-        "\nAND REPLIED:\n" + outStartMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
+          + "\nAND REPLIED:\n" + outStartMsg);
     }
   }
 
@@ -1151,12 +1123,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-        this.replicationServer.getMonitorInstanceName() + ", " +
-        this.getClass().getSimpleName() + " " + this + ":" +
-        "\nSH START HANDSHAKE SENT("+ this +
-        "):\n" + outStartMsg.toString()+
-        "\nAND RECEIVED:\n" + inStartMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
+          + inStartMsg);
     }
   }
 
@@ -1171,11 +1141,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + ":" +
-          "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
-          "\nAND REPLIED:\n" + outTopoMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
+          + outTopoMsg);
     }
   }
 
@@ -1190,11 +1159,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + ":" +
-          "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
-          "\nAND RECEIVED:\n" + inTopoMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
+          + inTopoMsg);
     }
   }
 
@@ -1209,11 +1177,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
-          "\nAND REPLIED:\n" + outTopoMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + " :"
+          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
+          + "\nAND REPLIED:\n" + outTopoMsg);
     }
   }
 
@@ -1224,10 +1191,9 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + " :"
+          + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
     }
   }
 
@@ -1240,11 +1206,9 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
-          inStartECLSessionMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + " :"
+          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
     }
   }
 
@@ -1264,10 +1228,9 @@
    */
   public long getReferenceGenId()
   {
-    long refgenid = -1;
-    if (replicationServerDomain!=null)
-      refgenid = replicationServerDomain.getGenerationId();
-    return refgenid;
+    if (replicationServerDomain != null)
+      return replicationServerDomain.getGenerationId();
+    return -1;
   }
 
   /**
@@ -1285,8 +1248,7 @@
    * @param update the update message received.
    * @throws IOException when it occurs.
    */
-  public void put(UpdateMsg update)
-  throws IOException
+  public void put(UpdateMsg update) throws IOException
   {
     if (replicationServerDomain!=null)
       replicationServerDomain.put(update, this);

--
Gitblit v1.10.0