From 0642bd56015b5571ada7bb38b77844c08b574d4c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Jul 2014 09:43:38 +0000
Subject: [PATCH] Code cleanup.

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  289 +++++++++++++++++++++++++++------------------------------
 1 files changed, 138 insertions(+), 151 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 1ae6953..1b00dce 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,7 +27,6 @@
 package org.opends.server.replication.server;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -287,16 +286,15 @@
    * Add an update that has been received to the list of
    * updates that must be forwarded to all other servers.
    *
-   * @param update  The update that has been received.
+   * @param updateMsg  The update that has been received.
    * @param sourceHandler The ServerHandler for the server from which the
    *        update was received
    * @throws IOException When an IO exception happens during the update
    *         processing.
    */
-  public void put(UpdateMsg update, ServerHandler sourceHandler)
-    throws IOException
+  public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException
   {
-    sourceHandler.updateServerState(update);
+    sourceHandler.updateServerState(updateMsg);
     sourceHandler.incrementInCount();
     setGenerationIdIfUnset(sourceHandler.getGenerationId());
 
@@ -318,78 +316,14 @@
      * data centers, but one will request and wait acks only from servers of the
      * data center 1.
      */
-    boolean assuredMessage = update.isAssured();
-    PreparedAssuredInfo preparedAssuredInfo = null;
-    if (assuredMessage)
-    {
-      // Assured feature is supported starting from replication protocol V2
-      if (sourceHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V2)
-      {
-        // According to assured sub-mode, prepare structures to keep track of
-        // the acks we are interested in.
-        AssuredMode assuredMode = update.getAssuredMode();
-        if (assuredMode == AssuredMode.SAFE_DATA_MODE)
-        {
-          sourceHandler.incrementAssuredSdReceivedUpdates();
-          preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
-        } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
-        {
-          sourceHandler.incrementAssuredSrReceivedUpdates();
-          preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
-        } else
-        {
-          // Unknown assured mode: should never happen
-          logger.error(ERR_RS_UNKNOWN_ASSURED_MODE,
-              localReplicationServer.getServerId(), assuredMode, baseDN, update);
-          assuredMessage = false;
-        }
-      } else
-      {
-        assuredMessage = false;
-      }
-    }
+    final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler);
 
-    if (!publishUpdateMsg(update))
+    if (!publishUpdateMsg(updateMsg))
     {
       return;
     }
 
-    List<Integer> expectedServers = null;
-    if (assuredMessage)
-    {
-      expectedServers = preparedAssuredInfo.expectedServers;
-      if (expectedServers != null)
-      {
-        // Store the expected acks info into the global map.
-        // The code for processing reception of acks for this update will update
-        // info kept in this object and if enough acks received, it will send
-        // back the final ack to the requester and remove the object from this
-        // map
-        // OR
-        // The following timer will time out and send an timeout ack to the
-        // requester if the acks are not received in time. The timer will also
-        // remove the object from this map.
-        CSN csn = update.getCSN();
-        waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
-
-        // Arm timer for this assured update message (wait for acks until it
-        // times out)
-        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
-        assuredTimeoutTimer.schedule(assuredTimeoutTask,
-            localReplicationServer.getAssuredTimeout());
-        // Purge timer every 100 treated messages
-        assuredTimeoutTimerPurgeCounter++;
-        if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
-        {
-          assuredTimeoutTimer.purge();
-        }
-      }
-    }
-
-    if (expectedServers == null)
-    {
-      expectedServers = Collections.emptyList();
-    }
+    final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo);
 
     /**
      * The update message equivalent to the originally received update message,
@@ -403,7 +337,8 @@
      * references posted on every writer queues. That is why we need a message
      * version with assured flag on and another one with assured flag off.
      */
-    NotAssuredUpdateMsg notAssuredUpdate = null;
+    final NotAssuredUpdateMsg notAssuredUpdateMsg =
+        preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null;
 
     // Push the message to the replication servers
     if (sourceHandler.isDataServer())
@@ -414,80 +349,145 @@
          * Ignore updates to RS with bad gen id
          * (no system managed status for a RS)
          */
-        if (isDifferentGenerationId(rsHandler.getGenerationId()))
+        if (!isDifferentGenerationId(rsHandler, updateMsg))
         {
-          if (logger.isTraceEnabled())
-          {
-            debug("update " + update.getCSN()
-                + " will not be sent to replication server "
-                + rsHandler.getServerId() + " with generation id "
-                + rsHandler.getGenerationId() + " different from local "
-                + "generation id " + generationId);
-          }
-
-          continue;
+          addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
         }
-
-        notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate,
-            assuredMessage, expectedServers);
       }
     }
 
     // Push the message to the LDAP servers
     for (DataServerHandler dsHandler : connectedDSs.values())
     {
-      // Don't forward the change to the server that just sent it
-      if (dsHandler == sourceHandler)
+      // Do not forward the change to the server that just sent it
+      if (dsHandler != sourceHandler
+          && !isUpdateMsgFiltered(updateMsg, dsHandler))
       {
-        continue;
+        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
       }
-
-      /**
-       * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
-       *
-       * The RSD lock should not be taken here as it is acceptable to have a
-       * delay between the time the server has a wrong status and the fact we
-       * detect it: the updates that succeed to pass during this time will have
-       * no impact on remote server. But it is interesting to not saturate
-       * uselessly the network if the updates are not necessary so this check to
-       * stop sending updates is interesting anyway. Not taking the RSD lock
-       * allows to have better performances in normal mode (most of the time).
-       */
-      ServerStatus dsStatus = dsHandler.getStatus();
-      if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
-          || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
-      {
-        if (logger.isTraceEnabled())
-        {
-          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
-          {
-            debug("update " + update.getCSN()
-                + " will not be sent to directory server "
-                + dsHandler.getServerId() + " with generation id "
-                + dsHandler.getGenerationId() + " different from local "
-                + "generation id " + generationId);
-          }
-          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
-          {
-            debug("update " + update.getCSN()
-                + " will not be sent to directory server "
-                + dsHandler.getServerId() + " as it is in full update");
-          }
-        }
-
-        continue;
-      }
-
-      notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate,
-          assuredMessage, expectedServers);
     }
 
     // Push the message to the other subscribing handlers
     for (MessageHandler mHandler : otherHandlers) {
-      mHandler.add(update);
+      mHandler.add(updateMsg);
     }
   }
 
+  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
+      UpdateMsg updateMsg)
+  {
+    final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId());
+    if (isDifferent && logger.isTraceEnabled())
+    {
+      debug("updateMsg " + updateMsg.getCSN()
+          + " will not be sent to replication server "
+          + rsHandler.getServerId() + " with generation id "
+          + rsHandler.getGenerationId() + " different from local "
+          + "generation id " + generationId);
+    }
+    return isDifferent;
+  }
+
+  /**
+   * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS.
+   * <p>
+   * The RSD lock should not be taken here as it is acceptable to have a delay
+   * between the time the server has a wrong status and the fact we detect it:
+   * the updates that succeed to pass during this time will have no impact on
+   * remote server. But it is interesting to not saturate uselessly the network
+   * if the updates are not necessary so this check to stop sending updates is
+   * interesting anyway. Not taking the RSD lock allows to have better
+   * performances in normal mode (most of the time).
+   */
+  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler)
+  {
+    final ServerStatus dsStatus = dsHandler.getStatus();
+    if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+    {
+      if (logger.isTraceEnabled())
+      {
+        debug("updateMsg " + updateMsg.getCSN()
+            + " will not be sent to directory server "
+            + dsHandler.getServerId() + " with generation id "
+            + dsHandler.getGenerationId() + " different from local "
+            + "generation id " + generationId);
+      }
+      return true;
+    }
+    else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+    {
+      if (logger.isTraceEnabled())
+      {
+        debug("updateMsg " + updateMsg.getCSN()
+            + " will not be sent to directory server "
+            + dsHandler.getServerId() + " as it is in full update");
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg,
+      ServerHandler sourceHandler) throws IOException
+  {
+    // Assured feature is supported starting from replication protocol V2
+    if (!updateMsg.isAssured()
+        || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2)
+    {
+      return null;
+    }
+
+    // According to assured sub-mode, prepare structures to keep track of
+    // the acks we are interested in.
+    switch (updateMsg.getAssuredMode())
+    {
+    case SAFE_DATA_MODE:
+      sourceHandler.incrementAssuredSdReceivedUpdates();
+      return processSafeDataUpdateMsg(updateMsg, sourceHandler);
+
+    case SAFE_READ_MODE:
+      sourceHandler.incrementAssuredSrReceivedUpdates();
+      return processSafeReadUpdateMsg(updateMsg, sourceHandler);
+
+    default:
+      // Unknown assured mode: should never happen
+      logger.error(ERR_RS_UNKNOWN_ASSURED_MODE,
+          localReplicationServer.getServerId(), updateMsg.getAssuredMode(), baseDN, updateMsg);
+      return null;
+    }
+  }
+
+  private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo)
+  {
+    List<Integer> expectedServers = null;
+    if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null)
+    {
+      expectedServers = preparedAssuredInfo.expectedServers;
+      // Store the expected acks info into the global map.
+      // The code for processing reception of acks for this update will update
+      // info kept in this object and if enough acks received, it will send
+      // back the final ack to the requester and remove the object from this map
+      // OR
+      // The following timer will time out and send an timeout ack to the
+      // requester if the acks are not received in time. The timer will also
+      // remove the object from this map.
+      final CSN csn = updateMsg.getCSN();
+      waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
+
+      // Arm timer for this assured update message (wait for acks until it times out)
+      final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
+      assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout());
+      // Purge timer every 100 treated messages
+      assuredTimeoutTimerPurgeCounter++;
+      if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
+      {
+        assuredTimeoutTimer.purge();
+      }
+    }
+
+    return expectedServers != null ? expectedServers : Collections.<Integer> emptyList();
+  }
+
   private boolean publishUpdateMsg(UpdateMsg updateMsg)
   {
     try
@@ -533,33 +533,20 @@
     }
   }
 
-  private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
-      UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
-      boolean assuredMessage, List<Integer> expectedServers)
-      throws UnsupportedEncodingException
+  private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg,
+      NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers)
   {
-    if (assuredMessage)
+    // Assured mode: post an assured or not assured matching update message
+    // according to what has been computed for the destination server
+    if (notAssuredUpdateMsg != null
+        && !assuredServers.contains(sHandler.getServerId()))
     {
-      // Assured mode: post an assured or not assured matching update
-      // message according to what has been computed for the destination server
-      if (expectedServers.contains(sHandler.getServerId()))
-      {
-        sHandler.add(update);
-      }
-      else
-      {
-        if (notAssuredUpdate == null)
-        {
-          notAssuredUpdate = new NotAssuredUpdateMsg(update);
-        }
-        sHandler.add(notAssuredUpdate);
-      }
+      sHandler.add(notAssuredUpdateMsg);
     }
     else
     {
-      sHandler.add(update);
+      sHandler.add(updateMsg);
     }
-    return notAssuredUpdate;
   }
 
   /**

--
Gitblit v1.10.0