From 46fd9423ab622d7f9531aa1564846ec52fe09534 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Wed, 24 Apr 2013 12:44:51 +0000
Subject: [PATCH] Replication Cleanup.

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |  589 +++++++++++++++++++++++++++++-----------------------------
 1 files changed, 292 insertions(+), 297 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index f9ab3eb..8ceab31 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2012 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -39,6 +39,7 @@
 import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -165,8 +166,8 @@
    * to be able to correlate all the coming back acks to the original
    * operation.
    */
-  private final SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
-    new TreeMap<ChangeNumber, UpdateMsg>();
+  private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs =
+    new ConcurrentHashMap<ChangeNumber, UpdateMsg>();
 
 
   /**
@@ -243,7 +244,7 @@
   // that have not been successfully acknowledged (either because of timeout,
   // wrong status or error at replay) for a particular server (DS or RS). String
   // format: <server id>:<number of failed updates>
-  private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
+  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
     new HashMap<Integer,Integer>();
   // Number of updates received in Assured Mode, Safe Read request
   private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
@@ -264,7 +265,7 @@
   // Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
   // that have not been successfully acknowledged because of timeout for a
   // particular RS. String format: <server id>:<number of failed updates>
-  private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
+  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
     new HashMap<Integer,Integer>();
 
   /**
@@ -278,10 +279,12 @@
 
   /* Status related monitoring fields */
 
-  // Indicates the date when the status changed. This may be used to indicate
-  // the date the session with the current replication server started (when
-  // status is NORMAL for instance). All the above assured monitoring fields
-  // are also reset each time the status is changed
+  /**
+   * Indicates the date when the status changed. This may be used to indicate
+   * the date the session with the current replication server started (when
+   * status is NORMAL for instance). All the above assured monitoring fields
+   * are also reset each time the status is changed
+   */
   private Date lastStatusChangeDate = new Date();
 
   /**
@@ -583,7 +586,7 @@
   }
 
   /**
-   * Returns informations about the DS server related to the provided serverId.
+   * Returns information about the DS server related to the provided serverId.
    * based on the TopologyMsg we received when the remote replica connected or
    * disconnected. Return null when no server with the provided serverId is
    * connected.
@@ -696,8 +699,7 @@
    */
   public void setURLs(Set<String> referralsUrl)
   {
-    for (String url : referralsUrl)
-      this.refUrls.add(url);
+      this.refUrls.addAll(referralsUrl);
   }
 
   /**
@@ -793,11 +795,12 @@
           // Another server is exporting its entries to us
           InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
 
-          // This must be done while we are still holding the
-          // broker lock because we are now going to receive a
-          // bunch of entries from the remote server and we
-          // want the import thread to catch them and
-          // not the ListenerThread.
+          /*
+          This must be done while we are still holding the broker lock
+          because we are now going to receive a bunch of entries from the
+          remote server and we want the import thread to catch them and
+          not the ListenerThread.
+          */
           initialize(initTargetMsg, initTargetMsg.getSenderID());
         }
         else if (msg instanceof ErrorMsg)
@@ -805,15 +808,16 @@
           ErrorMsg errorMsg = (ErrorMsg)msg;
           if (ieContext != null)
           {
-            // This is an error termination for the 2 following cases :
-            // - either during an export
-            // - or before an import really started
-            //    For example, when we publish a request and the
-            //    replicationServer did not find the import source.
-            //
-            // A remote error during the import will be received in the
-            // receiveEntryBytes() method.
-            //
+            /*
+            This is an error termination for the 2 following cases :
+            - either during an export
+            - or before an import really started
+            For example, when we publish a request and the
+            replicationServer did not find the import source.
+
+            A remote error during the import will be received in the
+            receiveEntryBytes() method.
+            */
             if (debugEnabled())
               TRACER.debugInfo(
                   "[IE] processErrorMsg:" + this.serverID +
@@ -827,9 +831,11 @@
             }
             else
             {
-              // Simply log - happen when the ErrorMsg relates to a previous
-              // attempt of initialization while we have started a new one
-              // on this side.
+              /*
+              Simply log - happen when the ErrorMsg relates to a previous
+              attempt of initialization while we have started a new one
+              on this side.
+              */
               logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
             }
           }
@@ -864,15 +870,17 @@
       {
         // just retry
       }
-      // Test if we have received and export request message and
-      // if that's the case handle it now.
-      // This must be done outside of the portion of code protected
-      // by the broker lock so that we keep receiveing update
-      // when we are doing and export and so that a possible
-      // closure of the socket happening when we are publishing the
-      // entries to the remote can be handled by the other
-      // replay thread when they call this method and therefore the
-      // broker.receive() method.
+      /*
+      Test if we have received and export request message and
+      if that's the case handle it now.
+      This must be done outside of the portion of code protected
+      by the broker lock so that we keep receiving update
+      when we are doing and export and so that a possible
+      closure of the socket happening when we are publishing the
+      entries to the remote can be handled by the other
+      replay thread when they call this method and therefore the
+      broker.receive() method.
+      */
       if (initReqMsg != null)
       {
         // Do this work in a thread to allow replay thread continue working
@@ -898,8 +906,8 @@
    * particular server in the list. This increments the counter of error for the
    * passed server, or creates an initial value of 1 error for it if the server
    * is not yet present in the map.
-   * @param errorList
-   * @param sid
+   * @param errorsByServer map of number of errors per serverID
+   * @param sid the ID of the server which produced an error
    */
   private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
     Integer sid)
@@ -916,7 +924,7 @@
       {
         // Server already present in list, just increment number of
         // errors for the server
-        int val = serverErrCount.intValue();
+        int val = serverErrCount;
         val++;
         errorsByServer.put(sid, val);
       }
@@ -935,10 +943,7 @@
 
     // Remove the message for pending ack list (this may already make the thread
     // that is waiting for the ack be aware of its reception)
-    synchronized (waitingAckMsgs)
-    {
-      update = waitingAckMsgs.remove(changeNumber);
-    }
+    update = waitingAckMsgs.remove(changeNumber);
 
     // Signal waiting thread ack has been received
     if (update != null)
@@ -957,8 +962,10 @@
 
       if ( hasTimeout || hasReplayErrors || hasWrongStatus)
       {
-        // Some problems detected: message not correclty reached every requested
-        // servers. Log problem
+        /*
+        Some problems detected: message did not correctly reach every
+        requested servers. Log problem
+        */
         Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
             serviceID, Integer.toString(serverID),
             update.toString(), ack.errorsToString());
@@ -1021,27 +1028,6 @@
     }
   }
 
-  /**
-   * Retrieves a replication domain based on the baseDn.
-   *
-   * @param serviceID           The identifier of the domain to retrieve.
-   *
-   * @return                    The domain retrieved.
-   *
-   * @throws DirectoryException When an error occurred or no domain
-   *                            match the provided baseDn.
-   */
-  static ReplicationDomain retrievesReplicationDomain(String serviceID)
-  throws DirectoryException
-  {
-    ReplicationDomain replicationDomain = domains.get(serviceID);
-    if (replicationDomain == null)
-    {
-      throw new DirectoryException(ResultCode.OTHER,
-          ERR_NO_MATCHING_DOMAIN.get(serviceID));
-    }
-    return replicationDomain;
-  }
 
   /*
    * After this point the code is related to the Total Update.
@@ -1051,7 +1037,7 @@
    * This thread is launched when we want to export data to another server.
    *
    * When a task is created locally (so this local server is the initiator)
-   * of the export (Exemple: dsreplication initialize-all),
+   * of the export (Example: dsreplication initialize-all),
    * this thread is NOT used but the task thread is running the export instead).
    */
   private class ExportThread extends DirectoryThread
@@ -1095,9 +1081,11 @@
             initWindow);
       } catch (DirectoryException de)
       {
-        // An error message has been sent to the peer
-        // This server is not the initiator of the export so there is
-        // nothing more to do locally.
+        /*
+        An error message has been sent to the peer
+        This server is not the initiator of the export so there is
+        nothing more to do locally.
+        */
       }
 
       if (debugEnabled())
@@ -1211,29 +1199,6 @@
     /**
      * Update the counters of the task for each entry processed during
      * an import or export.
-     * @throws DirectoryException if an error occurred.
-     */
-    public void updateCounters()
-      throws DirectoryException
-    {
-      entryLeftCount--;
-
-      if (initializeTask != null)
-      {
-        if (initializeTask instanceof InitializeTask)
-        {
-          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
-        }
-        else if (initializeTask instanceof InitializeTargetTask)
-        {
-          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
-        }
-      }
-    }
-
-    /**
-     * Update the counters of the task for each entry processed during
-     * an import or export.
      *
      * @param  entriesDone The number of entries that were processed
      *                     since the last time this method was called.
@@ -1379,7 +1344,7 @@
    * on this server, and the {@code importBackend(InputStream)}
    * will be called on the remote server.
    * <p>
-   * The InputStream and OutpuStream given as a parameter to those
+   * The InputStream and OutputStream given as a parameter to those
    * methods will be connected through the replication protocol.
    *
    * @param target   The server-id of the server that should be initialized.
@@ -1394,10 +1359,7 @@
   public void initializeRemote(int target, Task initTask)
   throws DirectoryException
   {
-
     initializeRemote(target, this.serverID, initTask, this.initWindow);
-
-
   }
 
   /**
@@ -1426,10 +1388,12 @@
     // Acquire and initialize the export context
     acquireIEContext(false);
 
-    // We manage the list of servers to initialize in order :
-    // - to test at the end that all expected servers have reconnected
-    //   after their import and with the right genId
-    // - to update the task with the server(s) where this test failed
+    /*
+    We manage the list of servers to initialize in order :
+    - to test at the end that all expected servers have reconnected
+    after their import and with the right genId
+    - to update the task with the server(s) where this test failed
+    */
 
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
@@ -1526,14 +1490,15 @@
       {
         try
         {
-          // Handling the errors during export
+          /*
+          Handling the errors during export
 
-          // Note: we could have lost the connection and another thread
-          //       the listener one) has already managed to reconnect.
-          //       So we MUST rely on the test broker.isConnected()
-          //       ONLY to do 'wait to be reconnected by another thread'
-          //       (if not yet reconnected already).
-
+          Note: we could have lost the connection and another thread
+          the listener one) has already managed to reconnect.
+          So we MUST rely on the test broker.isConnected()
+          ONLY to do 'wait to be reconnected by another thread'
+          (if not yet reconnected already).
+          */
           if (!broker.isConnected())
           {
             // We are still disconnected, so we wait for the listener thread
@@ -1550,14 +1515,16 @@
           if ((initTask != null) && broker.isConnected() &&
               (serverToInitialize != RoutableMsg.ALL_SERVERS))
           {
-            // NewAttempt case : In the case where
-            // - it's not an InitializeAll
-            // - AND the previous export attempt failed
-            // - AND we are (now) connected
-            // - and we own the task and this task is not an InitializeAll
-            // Let's :
-            // - sleep to let time to the other peer to reconnect if needed
-            // - and launch another attempt
+            /*
+            NewAttempt case : In the case where
+            - it's not an InitializeAll
+            - AND the previous export attempt failed
+            - AND we are (now) connected
+            - and we own the task and this task is not an InitializeAll
+            Let's :
+            - sleep to let time to the other peer to reconnect if needed
+            - and launch another attempt
+            */
             try { Thread.sleep(1000); } catch(Exception e){}
             logError(NOTE_RESENDING_INIT_TARGET.get(
                 exportRootException.getLocalizedMessage()));
@@ -1632,8 +1599,7 @@
     int waitResultAttempt = 0;
     Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
 
-    for (Integer sid : ieContext.startList)
-      replicasWeAreWaitingFor.add(sid);
+    replicasWeAreWaitingFor.addAll(ieContext.startList);
 
     if (debugEnabled())
       TRACER.debugInfo(
@@ -1657,8 +1623,11 @@
           {
             // this one is still not doing the Full Update ... retry later
             done = false;
-            try
-            { Thread.sleep(100); } catch (InterruptedException e) {}
+            try { Thread.sleep(100);
+            }
+            catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
             waitResultAttempt++;
             break;
           }
@@ -1673,8 +1642,7 @@
     while ((!done) && (waitResultAttempt<1200) // 2mn
         && (!broker.shuttingDown()));
 
-    ieContext.failureList.addAll(
-        Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
+    ieContext.failureList.addAll(replicasWeAreWaitingFor);
 
     if (debugEnabled())
       TRACER.debugInfo(
@@ -1697,9 +1665,11 @@
       TRACER.debugInfo(
         "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
 
-    // In case some new servers appear during the init, we want them to be
-    // considered in the processing of sorting the successfully initialized
-    // and the others
+    /*
+    In case some new servers appear during the init, we want them to be
+    considered in the processing of sorting the successfully initialized
+    and the others
+    */
     for (DSInfo dsi : getReplicasList())
       replicasWeAreWaitingFor.add(dsi.getDsId());
 
@@ -1709,22 +1679,25 @@
       done = true;
       short reconnectMaxDelayInSec = 10;
       short reconnectWait = 0;
-      Integer[] servers = replicasWeAreWaitingFor.toArray(new Integer[0]);
-      for (int serverId : servers)
+      for (int serverId : replicasWeAreWaitingFor)
       {
         if (ieContext.failureList.contains(serverId))
         {
-          // this server has already been in error during initialization
-          // dont't wait for it
+          /*
+          this server has already been in error during initialization
+          don't wait for it
+          */
           continue;
         }
 
         DSInfo dsInfo = isRemoteDSConnected(serverId);
         if (dsInfo == null)
         {
-          // this server is disconnected
-          // may be for a long time if it crashed or had been stopped
-          // may be just the time to reconnect after import : should be short
+          /*
+          this server is disconnected
+          may be for a long time if it crashed or had been stopped
+          may be just the time to reconnect after import : should be short
+          */
           if (++reconnectWait<reconnectMaxDelayInSec)
           {
             // let's still wait to give a chance to this server to reconnect
@@ -1764,8 +1737,7 @@
     }
     while ((!done) && (!broker.shuttingDown())); // infinite wait
 
-    ieContext.failureList.addAll(
-        Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
+    ieContext.failureList.addAll(replicasWeAreWaitingFor);
 
     if (debugEnabled())
       TRACER.debugInfo(
@@ -1839,8 +1811,10 @@
       }
       else
       {
-        // When we are the exporter in the case of initializeAll
-        // exporting must not be stopped on the first error.
+        /*
+        When we are the exporter in the case of initializeAll
+        exporting must not be stopped on the first error.
+        */
       }
     }
   }
@@ -1889,7 +1863,7 @@
           }
         }
 
-        // Check good sequentiality of msg received
+        // Check good ordering of msg received
         if (msg instanceof EntryMsg)
         {
           EntryMsg entryMsg = (EntryMsg)msg;
@@ -1899,7 +1873,7 @@
           if (ieContext.exporterProtocolVersion >=
             ProtocolVersion.REPLICATION_PROTOCOL_V4)
           {
-            // check the msgCnt of the msg received to check sequenciality
+            // check the msgCnt of the msg received to check ordering
             if (++ieContext.msgCnt != entryMsg.getMsgId())
             {
               if (ieContext.getException() == null)
@@ -1928,16 +1902,20 @@
         }
         else if (msg instanceof DoneMsg)
         {
-          // This is the normal termination of the import
-          // No error is stored and the import is ended
-          // by returning null
+          /*
+          This is the normal termination of the import
+          No error is stored and the import is ended
+          by returning null
+          */
           return null;
         }
         else if (msg instanceof ErrorMsg)
         {
-          // This is an error termination during the import
-          // The error is stored and the import is ended
-          // by returning null
+          /*
+          This is an error termination during the import
+          The error is stored and the import is ended
+          by returning null
+          */
           if (ieContext.getException() == null)
           {
             ErrorMsg errMsg = (ErrorMsg)msg;
@@ -1971,7 +1949,6 @@
       }
       catch(Exception e)
       {
-        // TODO: i18n
         if (ieContext.getException() == null)
           ieContext.setException(new DirectoryException(ResultCode.OTHER,
             ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
@@ -1984,7 +1961,7 @@
    * This is based on the hypothesis that the entries are separated
    * by a "\n\n" String.
    *
-   * @param   entryBytes
+   * @param   entryBytes the set of bytes containing one or more entries.
    * @return  The number of entries in the provided byte[].
    */
   private int countEntryLimits(byte[] entryBytes)
@@ -1997,7 +1974,7 @@
    * This is based on the hypothesis that the entries are separated
    * by a "\n\n" String.
    *
-   * @param   entryBytes
+   * @param   entryBytes the set of bytes containing one or more entries.
    * @return  The number of entries in the provided byte[].
    */
   private int countEntryLimits(byte[] entryBytes, int pos, int length)
@@ -2029,7 +2006,8 @@
   throws IOException
   {
     if (debugEnabled())
-      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
+      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" +
+          Arrays.toString(lDIFEntry));
 
     // build the message
     EntryMsg entryMessage = new EntryMsg(
@@ -2039,9 +2017,11 @@
     // Waiting the slowest loop
     while (!broker.shuttingDown())
     {
-      // If an error was raised - like receiving an ErrorMsg from a remote
-      // server that have been stored by the listener thread in the ieContext,
-      // we just abandon the export by throwing an exception.
+      /*
+      If an error was raised - like receiving an ErrorMsg from a remote
+      server that have been stored by the listener thread in the ieContext,
+      we just abandon the export by throwing an exception.
+      */
       if (ieContext.getException() != null)
         throw(new IOException(ieContext.getException().getMessage()));
 
@@ -2094,7 +2074,8 @@
     } // Waiting the slowest loop
 
     if (debugEnabled())
-      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
+      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
+          + Arrays.toString(lDIFEntry));
 
     // publish the message
     boolean sent = broker.publish(entryMessage, false);
@@ -2212,18 +2193,22 @@
       errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
     }
 
-    // We must not test here whether the remote source is connected to
-    // the topology by testing if it stands in the replicas list since.
-    // In the case of a re-attempt of initialization, the listener thread is
-    // running this method directly coming from initailize() method and did
-    // not processed any topology message in between the failure and the
-    // new attempt.
+    /*
+    We must not test here whether the remote source is connected to
+    the topology by testing if it stands in the replicas list since.
+    In the case of a re-attempt of initialization, the listener thread is
+    running this method directly coming from initialize() method and did
+    not processed any topology message in between the failure and the
+    new attempt.
+    */
     try
     {
-      // We must immediatly acquire a context to store the task inside
-      // The context will be used when we (the listener thread) will receive
-      // the InitializeTargetMsg, process the import, and at the end
-      // update the task.
+      /*
+      We must immediately acquire a context to store the task inside
+      The context will be used when we (the listener thread) will receive
+      the InitializeTargetMsg, process the import, and at the end
+      update the task.
+      */
 
       acquireIEContext(true);  //test and set if no import already in progress
       ieContext.initializeTask = initTask;
@@ -2234,11 +2219,13 @@
       // Publish Init request msg
       broker.publish(ieContext.initReqMsgSent);
 
-      // The normal success processing is now to receive InitTargetMsg then
-      // entries from the remote server.
-      // The error cases are :
-      // - either local error immediatly caught below
-      // - a remote error we will receive as an ErrorMsg
+      /*
+      The normal success processing is now to receive InitTargetMsg then
+      entries from the remote server.
+      The error cases are :
+      - either local error immediately caught below
+      - a remote error we will receive as an ErrorMsg
+      */
     }
     catch(DirectoryException de)
     {
@@ -2272,15 +2259,15 @@
    *
    * @param initTargetMsgReceived The message received from the remote server.
    *
-   * @param requestorServerId The serverId of the server that requested the
+   * @param requesterServerId The serverId of the server that requested the
    *                          initialization meaning the server where the
    *                          task has initially been created (this server,
    *                          or the remote server).
    */
   void initialize(InitializeTargetMsg initTargetMsgReceived,
-      int requestorServerId)
+      int requesterServerId)
   {
-    InitializeTask initFromtask = null;
+    InitializeTask initFromTask = null;
 
     if (debugEnabled())
       TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
@@ -2300,16 +2287,20 @@
       // Acquire an import context if no already done (and initialize).
       if (initTargetMsgReceived.getInitiatorID() == this.serverID)
       {
-        // The initTargetMsgReceived received is the answer to a request that
-        // we (this server) sent previously. In this case, so the IEContext
-        // has been already acquired when the request was published in order
-        // to store the task (to be updated with the status at the end).
+        /*
+        The initTargetMsgReceived received is the answer to a request that
+        we (this server) sent previously. In this case, so the IEContext
+        has been already acquired when the request was published in order
+        to store the task (to be updated with the status at the end).
+        */
       }
       else
       {
-        // The initTargetMsgReceived is for an import initiated by the remote
-        // server.
-        // Test and set if no import already in progress
+        /*
+        The initTargetMsgReceived is for an import initiated by the remote
+        server.
+        Test and set if no import already in progress
+        */
         acquireIEContext(true);
       }
 
@@ -2319,16 +2310,18 @@
       ieContext.initWindow = initTargetMsgReceived.getInitWindow();
       // Protocol version is -1 when not known.
       ieContext.exporterProtocolVersion = getProtocolVersion(source);
-      initFromtask = (InitializeTask)ieContext.initializeTask;
+      initFromTask = (InitializeTask)ieContext.initializeTask;
 
-      // Lauch the import
+      // Launch the import
       importBackend(new ReplInputStream(this));
 
     }
     catch (DirectoryException e)
     {
-      // Store the exception raised. It will be considered if no other exception
-      // has been previously stored in  the context
+      /*
+      Store the exception raised. It will be considered if no other exception
+      has been previously stored in  the context
+      */
       if (ieContext.getException() == null)
         ieContext.setException(e);
     }
@@ -2339,30 +2332,37 @@
           + " ends import with exception=" + ieContext.getException()
           + " connected=" + broker.isConnected());
 
-      // It is necessary to restart (reconnect to RS) for different reasons
-      //   - when everything went well, reconnect in order to exchange
-      //     new state, new generation ID
-      //   - when we have connection failure, reconnect to retry a new import
-      //     right here, right now
-      // we never want retryOnFailure if we fails reconnecting in the restart.
+      /*
+      It is necessary to restart (reconnect to RS) for different reasons
+      - when everything went well, reconnect in order to exchange
+      new state, new generation ID
+      - when we have connection failure, reconnect to retry a new import
+      right here, right now
+      we never want retryOnFailure if we fails reconnecting in the restart.
+      */
       broker.reStart(false);
 
       if (ieContext.getException() != null)
       {
-        if (broker.isConnected() && (initFromtask != null)
+        if (broker.isConnected() && (initFromTask != null)
             && (++ieContext.attemptCnt<2))
         {
-          // Worth a new attempt
-          // since initFromtask is in this server, connection is ok
+          /*
+          Worth a new attempt
+          since initFromTask is in this server, connection is ok
+          */
           try
           {
-
-            // Wait for the exporter to stabilize - eventually reconnect as
-            // well if it was connected to the same RS than the one we lost ...
+            /*
+            Wait for the exporter to stabilize - eventually reconnect as
+            well if it was connected to the same RS than the one we lost ...
+            */
             Thread.sleep(1000);
 
-            // Restart the whole import protocol exchange by sending again
-            // the request
+            /*
+            Restart the whole import protocol exchange by sending again
+            the request
+            */
             logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                 ieContext.getException().getLocalizedMessage()));
 
@@ -2378,8 +2378,10 @@
           }
           catch(Exception e)
           {
-            // An error occurs when sending a new request for a new import.
-            // This error is not stored, prefering to keep the initial one.
+            /*
+            An error occurs when sending a new request for a new import.
+            This error is not stored, prefering to keep the initial one.
+            */
             logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
               e.getLocalizedMessage(),
               ieContext.getException().getLocalizedMessage()));
@@ -2394,7 +2396,7 @@
         TRACER.debugInfo("[IE] Domain=" + this
           + " ends initialization with exception=" + ieContext.getException()
           + " connected=" + broker.isConnected()
-          + " task=" + initFromtask
+          + " task=" + initFromTask
           + " attempt=" + ieContext.attemptCnt);
 
       try
@@ -2402,24 +2404,28 @@
         if (broker.isConnected() && (ieContext.getException() != null))
         {
           // Let's notify the exporter
-          ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
+          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
               ieContext.getException().getMessageObject());
           broker.publish(errorMsg);
         }
         else // !broker.isConnected()
         {
-          // Don't try to reconnect here.
-          // The current running thread is the listener thread and will loop on
-          // receive() that is expected to manage reconnects attempt.
+          /*
+          Don't try to reconnect here.
+          The current running thread is the listener thread and will loop on
+          receive() that is expected to manage reconnects attempt.
+          */
         }
 
-        // Update the task that initiated the import must be the last thing.
-        // Particularly, broker.restart() after import success must be done
-        // before some other operations/tasks to be launched,
-        // like resetting the generation ID.
-        if (initFromtask != null)
+        /*
+        Update the task that initiated the import must be the last thing.
+        Particularly, broker.restart() after import success must be done
+        before some other operations/tasks to be launched,
+        like resetting the generation ID.
+        */
+        if (initFromTask != null)
         {
-          initFromtask.updateTaskCompletionState(ieContext.getException());
+          initFromTask.updateTaskCompletionState(ieContext.getException());
         }
       }
       finally
@@ -2435,10 +2441,10 @@
   }
 
   /**
-   * Return the protocol version of the DS related to the provided serverid.
+   * Return the protocol version of the DS related to the provided serverId.
    * Returns -1 when the protocol version is not known.
-   * @param dsServerId The provided serverid.
-   * @return The procotol version.
+   * @param dsServerId The provided serverId.
+   * @return The protocol version.
    */
   short getProtocolVersion(int dsServerId)
   {
@@ -2515,11 +2521,11 @@
   private void checkGenerationID(long generationID)
   throws DirectoryException
   {
-    boolean allset = true;
+    boolean allSet = true;
 
     for (int i = 0; i< 50; i++)
     {
-      allset = true;
+      allSet = true;
       for (RSInfo rsInfo : getRsList())
       {
         // the 'empty' RSes (generationId==-1) are considered as good citizens
@@ -2532,16 +2538,16 @@
           } catch (InterruptedException e)
           {
           }
-          allset = false;
+          allSet = false;
           break;
         }
       }
-      if (allset)
+      if (allSet)
       {
         break;
       }
     }
-    if (!allset)
+    if (!allSet)
     {
       ResultCode resultCode = ResultCode.OTHER;
       Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
@@ -2735,9 +2741,11 @@
    */
   void processUpdateDoneSynchronous(UpdateMsg msg)
   {
-    // Warning: in synchronous mode, no way to tell the replay of an update went
-    // wrong Just put null in processUpdateDone so that if assured replication
-    // is used the ack is sent without error at replay flag.
+    /*
+    Warning: in synchronous mode, no way to tell the replay of an update went
+    wrong Just put null in processUpdateDone so that if assured replication
+    is used the ack is sent without error at replay flag.
+    */
     processUpdateDone(msg, null);
     state.update(msg.getChangeNumber());
   }
@@ -2749,10 +2757,7 @@
    */
   public boolean isConnected()
   {
-    if (broker != null)
-      return broker.isConnected();
-    else
-      return false;
+    return broker != null && broker.isConnected();
   }
 
   /**
@@ -2764,10 +2769,7 @@
    */
   public boolean hasConnectionError()
   {
-    if (broker != null)
-      return broker.hasConnectionError();
-    else
-      return true;
+    return broker == null || broker.hasConnectionError();
   }
 
   /**
@@ -2852,24 +2854,16 @@
   /**
    * Gets the number of updates sent in assured safe read mode that have not
    * been acknowledged per server.
-   * @return The number of updates sent in assured safe read mode that have not
-   * been acknowledged per server.
+   * @return A copy of the map that contains the number of updates sent in
+   * assured safe read mode that have not been acknowledged per server.
    */
   public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
   {
-    // Clone a snapshot with synchronized section to have a consistent view in
-    // monitoring
-    Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
     synchronized(assuredSrServerNotAcknowledgedUpdates)
     {
-      Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
-      for (Integer serverId : keySet)
-      {
-        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
-        snapshot.put(serverId, i);
-      }
+      return new HashMap<Integer, Integer>(
+          assuredSrServerNotAcknowledgedUpdates);
     }
-    return snapshot;
   }
 
   /**
@@ -2937,24 +2931,16 @@
   /**
    * Gets the number of updates sent in assured safe data mode that have not
    * been acknowledged due to timeout error per server.
-   * @return The number of updates sent in assured safe data mode that have not
-   * been acknowledged due to timeout error per server.
+   * @return A copy of the map that contains the number of updates sent in
+   * assured safe data mode that have not been acknowledged due to timeout
+   * error per server.
    */
   public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
   {
-    // Clone a snapshot with synchronized section to have a consistent view in
-    // monitoring
-    Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
     synchronized(assuredSdServerTimeoutUpdates)
     {
-      Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet();
-      for (Integer serverId : keySet)
-      {
-        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
-        snapshot.put(serverId, i);
-      }
+      return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates);
     }
-    return snapshot;
   }
 
   /**
@@ -2981,14 +2967,20 @@
     assuredSrTimeoutUpdates = new AtomicInteger(0);
     assuredSrWrongStatusUpdates = new AtomicInteger(0);
     assuredSrReplayErrorUpdates = new AtomicInteger(0);
-    assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>();
+    synchronized (assuredSrServerNotAcknowledgedUpdates)
+    {
+      assuredSrServerNotAcknowledgedUpdates.clear();
+    }
     assuredSrReceivedUpdates = new AtomicInteger(0);
     assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
     assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
     assuredSdSentUpdates = new AtomicInteger(0);
     assuredSdAcknowledgedUpdates = new AtomicInteger(0);
     assuredSdTimeoutUpdates = new AtomicInteger(0);
-    assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>();
+    synchronized (assuredSdServerTimeoutUpdates)
+    {
+      assuredSdServerTimeoutUpdates.clear();
+    }
   }
 
   /*
@@ -3080,8 +3072,10 @@
   {
     synchronized (sessionLock)
     {
-      // Stop the broker first in order to prevent the listener from
-      // reconnecting - see OPENDJ-457.
+      /*
+      Stop the broker first in order to prevent the listener from
+      reconnecting - see OPENDJ-457.
+      */
       if (broker != null)
       {
         broker.stop();
@@ -3263,8 +3257,10 @@
   {
     broker.updateWindowAfterReplay();
 
-    // Send an ack if it was requested and the group id is the same of the RS
-    // one. Only Safe Read mode makes sense in DS for returning an ack.
+    /*
+    Send an ack if it was requested and the group id is the same of the RS
+    one. Only Safe Read mode makes sense in DS for returning an ack.
+    */
     byte rsGroupId = broker.getRsGroupId();
     if (msg.isAssured())
     {
@@ -3282,9 +3278,9 @@
             if (replayErrorMsg != null)
             {
               // Mark the error in the ack
-              //   -> replay error occured
+              //   -> replay error occurred
               ackMsg.setHasReplayError(true);
-              //   -> replay error occured in our server
+              //   -> replay error occurred in our server
               List<Integer> idList = new ArrayList<Integer>();
               idList.add(serverID);
               ackMsg.setFailedServers(idList);
@@ -3306,8 +3302,10 @@
           logError(errorMsg);
         } else
         {
-          // In safe data mode assured update that comes up to a DS requires no
-          // ack from a destinator DS. Safe data mode is based on RS acks only
+          /*
+          In safe data mode assured update that comes up to a DS requires no
+          ack from a recipient DS. Safe data mode is based on RS acks only
+          */
         }
       }
     }
@@ -3343,7 +3341,7 @@
      * If assured configured, set message accordingly to request an ack in the
      * right assured mode.
      * No ack requested for a RS with a different group id. Assured
-     * replication suported for the same locality, i.e: a topology working in
+     * replication supported for the same locality, i.e: a topology working in
      * the same
      * geographical location). If we are connected to a RS which is not in our
      * locality, no need to ask for an ack.
@@ -3355,12 +3353,11 @@
       if (assuredMode == AssuredMode.SAFE_DATA_MODE)
         msg.setSafeDataLevel(assuredSdLevel);
 
-      // Add the assured message to the list of update that are
-      // waiting for acks
-      synchronized (waitingAckMsgs)
-      {
-        waitingAckMsgs.put(msg.getChangeNumber(), msg);
-      }
+      /*
+      Add the assured message to the list of update that are
+      waiting for acks
+      */
+      waitingAckMsgs.put(msg.getChangeNumber(), msg);
     }
   }
 
@@ -3410,8 +3407,10 @@
       {
         try
         {
-          // WARNING: this timeout may be difficult to optimize: too low, it
-          // may use too much CPU, too high, it may penalize performance...
+          /*
+          WARNING: this timeout may be difficult to optimize: too low, it
+          may use too much CPU, too high, it may penalize performance...
+          */
           msg.wait(10);
         } catch (InterruptedException e)
         {
@@ -3425,14 +3424,13 @@
         // Timeout ?
         if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
         {
-          // Timeout occured, be sure that ack is not being received and if so,
-          // remove the update from the wait list, log the timeout error and
-          // also update assured monitoring counters
+          /*
+          Timeout occurred, be sure that ack is not being received and if so,
+          remove the update from the wait list, log the timeout error and
+          also update assured monitoring counters
+          */
           UpdateMsg update;
-          synchronized (waitingAckMsgs)
-          {
-            update = waitingAckMsgs.remove(cn);
-          }
+          update = waitingAckMsgs.remove(cn);
 
           if (update != null)
           {
@@ -3490,9 +3488,9 @@
   }
 
   /**
-   * Publish informations to the Replication Service (not assured mode).
+   * Publish information to the Replication Service (not assured mode).
    *
-   * @param msg  The byte array containing the informations that should
+   * @param msg  The byte array containing the information that should
    *             be sent to the remote entities.
    */
   public void publish(byte[] msg)
@@ -3501,10 +3499,11 @@
     synchronized (this)
     {
       update = new UpdateMsg(generator.newChangeNumber(), msg);
-
-      // If assured replication is configured, this will prepare blocking
-      // mechanism. If assured replication is disabled, this returns
-      // immediately
+      /*
+      If assured replication is configured, this will prepare blocking
+      mechanism. If assured replication is disabled, this returns
+      immediately
+      */
       prepareWaitForAckIfAssuredEnabled(update);
 
       publish(update);
@@ -3512,16 +3511,18 @@
 
     try
     {
-      // If assured replication is enabled, this will wait for the matching
-      // ack or time out. If assured replication is disabled, this returns
-      // immediately
+      /*
+      If assured replication is enabled, this will wait for the matching
+      ack or time out. If assured replication is disabled, this returns
+      immediately
+      */
       waitForAckIfAssuredEnabled(update);
     } catch (TimeoutException ex)
     {
       // This exception may only be raised if assured replication is
       // enabled
       Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
-        assuredTimeout), msg.toString());
+        assuredTimeout), update.toString());
       logError(errorMsg);
     }
   }
@@ -3557,10 +3558,7 @@
    */
   public boolean importInProgress()
   {
-    if (ieContext == null)
-      return false;
-    else
-      return ieContext.importInProgress;
+    return ieContext != null && ieContext.importInProgress;
   }
 
   /**
@@ -3572,10 +3570,7 @@
    */
   public boolean exportInProgress()
   {
-    if (ieContext == null)
-      return false;
-    else
-      return !ieContext.importInProgress;
+    return ieContext != null && !ieContext.importInProgress;
   }
 
   /**

--
Gitblit v1.10.0