From a5131f44a6afa554af8f4c82c7ffd3d4ceac1bd4 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 04 Feb 2011 12:50:58 +0000
Subject: [PATCH] OPEN - issue OPENDJ-26: Fix OpenDS issue 4585: ConcurrentModificationException in ReplicationBroker  https://bugster.forgerock.org/jira/browse/OPENDJ-26

---
 opends/src/server/org/opends/server/replication/protocol/SocketSession.java                |    3 
 opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java            |   38 +-
 opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java                 |   42 ++-
 opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java                  |  359 +++++++++++++++--------------
 opends/src/server/org/opends/server/replication/service/ListenerThread.java                |   24 +
 opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java             |   12 
 opends/src/server/org/opends/server/replication/server/ServerReader.java                   |    5 
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java             |   54 ++--
 opends/src/server/org/opends/server/replication/server/ServerWriter.java                   |    9 
 opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java  |    3 
 opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java              |   19 
 opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java |    3 
 opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java             |    3 
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java          |   47 ++-
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java             |   35 +-
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java                  |    1 
 opends/src/server/org/opends/server/replication/common/ServerState.java                    |    5 
 opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java               |    6 
 opends/src/server/org/opends/server/replication/plugin/ReplayThread.java                   |    7 
 opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java    |   39 --
 20 files changed, 369 insertions(+), 345 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 16861da..e8fea1c 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.common;
 
@@ -48,8 +49,8 @@
  */
 public class ServerState implements Iterable<Integer>
 {
-  private HashMap<Integer, ChangeNumber> list;
-  private boolean saved = true;
+  private final HashMap<Integer, ChangeNumber> list;
+  private volatile boolean saved = true;
 
   /**
    * Creates a new empty ServerState.
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 29e756b..27d3451 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -191,8 +191,8 @@
    */
   private class ScanSearchListener implements InternalSearchListener
   {
-    private ChangeNumber startingChangeNumber = null;
-    private ChangeNumber endChangeNumber = null;
+    private final ChangeNumber startingChangeNumber;
+    private final ChangeNumber endChangeNumber;
 
     public ScanSearchListener(
         ChangeNumber startingChangeNumber,
@@ -262,8 +262,8 @@
   private final PersistentServerState state;
   private int numReplayedPostOpCalled = 0;
 
-  private long generationId = -1;
-  private boolean generationIdSavedStatus = false;
+  private volatile long generationId = -1;
+  private volatile boolean generationIdSavedStatus = false;
 
   private final ChangeNumberGenerator generator;
 
@@ -289,15 +289,15 @@
 
   private final DN baseDn;
 
-  private boolean shutdown = false;
+  private volatile boolean shutdown = false;
 
   private final InternalClientConnection conn =
       InternalClientConnection.getRootConnection();
 
   private boolean solveConflictFlag = true;
 
-  private boolean disabled = false;
-  private boolean stateSavingDisabled = false;
+  private volatile boolean disabled = false;
+  private volatile boolean stateSavingDisabled = false;
 
   // This list is used to temporary store operations that needs
   // to be replayed at session establishment time.
@@ -311,7 +311,7 @@
    * Possible values are accept-updates or deny-updates, but other values
    * may be added in the future.
    */
-  private IsolationPolicy isolationpolicy;
+  private IsolationPolicy isolationPolicy;
 
   /**
    * The DN of the configuration entry of this domain.
@@ -323,7 +323,7 @@
    * A boolean indicating if the thread used to save the persistentServerState
    * is terminated.
    */
-  private boolean done = true;
+  private volatile boolean done = true;
 
   private ServerStateFlush flushThread;
 
@@ -374,7 +374,7 @@
    * fractional configuration (i.e with compliant fractional configuration in
    * domain root entry).
    */
-  private boolean force_bad_data_set = false;
+  private boolean forceBadDataSet = false;
 
   /**
    * This flag is used by the fractional replication ldif import plugin to
@@ -447,21 +447,24 @@
     {
       done = false;
 
-      while (shutdown  == false)
+      while (shutdown == false)
       {
         try
         {
           synchronized (this)
           {
             this.wait(1000);
-            if (!disabled && !stateSavingDisabled )
+            if (!disabled && !stateSavingDisabled)
             {
               // save the ServerState
               state.save();
             }
           }
-        } catch (InterruptedException e)
-        { }
+        }
+        catch (InterruptedException e)
+        {
+          // Thread interrupted: check for shutdown.
+        }
       }
       state.save();
 
@@ -475,7 +478,7 @@
    */
   private class RSUpdater extends DirectoryThread
   {
-    private ChangeNumber startChangeNumber;
+    private final ChangeNumber startChangeNumber;
     protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
     {
       super("Replication Server Updater for server id " +
@@ -571,7 +574,7 @@
     this.baseDn = configuration.getBaseDN();
     int window  = configuration.getWindowSize();
     heartbeatInterval = configuration.getHeartbeatInterval();
-    this.isolationpolicy = configuration.getIsolationPolicy();
+    this.isolationPolicy = configuration.getIsolationPolicy();
     this.configDn = configuration.dn();
     this.logChangeNumber = configuration.isLogChangenumber();
     this.updateToReplayQueue = updateToReplayQueue;
@@ -2030,12 +2033,12 @@
    */
   private boolean brokerIsConnected(PreOperationOperation op)
   {
-    if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
+    if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
     {
       // this policy imply that we always accept updates.
       return true;
     }
-    if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
+    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
     {
       // this isolation policy specifies that the updates are denied
       // when the broker had problems during the connection phase
@@ -4429,7 +4432,7 @@
   public ConfigChangeResult applyConfigurationChange(
          ReplicationDomainCfg configuration)
   {
-    isolationpolicy = configuration.getIsolationPolicy();
+    isolationPolicy = configuration.getIsolationPolicy();
     logChangeNumber = configuration.isLogChangenumber();
     histPurgeDelayInMilliSec =
       configuration.getConflictsHistoricalPurgeDelay()*60*1000;
@@ -4657,7 +4660,7 @@
   {
     // Check domain fractional configuration consistency with local
     // configuration variables
-    force_bad_data_set = !isBackendFractionalConfigConsistent();
+    forceBadDataSet = !isBackendFractionalConfigConsistent();
 
     super.sessionInitiated(
         initStatus, replicationServerState,generationID, session);
@@ -4687,7 +4690,7 @@
     }
 
     // Now for bad data set status if needed
-    if (force_bad_data_set)
+    if (forceBadDataSet)
     {
       // Go into bad data set status
       setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
@@ -4950,7 +4953,7 @@
   {
     // Ignore message if fractional configuration is inconcsistent and
     // we have been passed into bad data set status
-    if (force_bad_data_set)
+    if (forceBadDataSet)
     {
       return false;
     }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 1899935..c964423 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 import org.opends.server.replication.protocol.LDAPUpdateMsg;
@@ -54,9 +55,9 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
-  private boolean shutdown = false;
-  private boolean done = false;
+  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
+  private volatile boolean shutdown = false;
+  private volatile boolean done = false;
   private static int count = 0;
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
index 518435b..5a0688b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2007-2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 
 package org.opends.server.replication.protocol;
@@ -54,25 +55,25 @@
   /**
    * The session on which heartbeats are to be monitored.
    */
-  private ProtocolSession session;
+  private final ProtocolSession session;
 
 
   /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
    */
-  private long heartbeatInterval;
+  private final long heartbeatInterval;
 
 
   /**
    * Set this to stop the thread.
    */
-  private boolean shutdown = false;
+  private volatile boolean shutdown = false;
 
   /**
    * Send StopMsg before session closure or not.
    */
-  private boolean sendStopBeforeClose = false;
+  private final boolean sendStopBeforeClose;
 
 
   /**
@@ -131,7 +132,8 @@
               try
               {
                 session.publish(new StopMsg());
-              } catch(IOException ioe)
+              }
+              catch (IOException ioe)
               {
                 // Anyway, going to close session, so nothing to do
               }
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 8b97568..778a5a4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 
 package org.opends.server.replication.protocol;
@@ -50,25 +51,25 @@
   /**
    * For test purposes only to simulate loss of heartbeats.
    */
-  static private boolean heartbeatsDisabled = false;
+  private static volatile boolean heartbeatsDisabled = false;
 
   /**
    * The session on which heartbeats are to be sent.
    */
-  private ProtocolSession session;
+  private final ProtocolSession session;
 
 
   /**
    * The time in milliseconds between heartbeats.
    */
-  private long heartbeatInterval;
+  private final long heartbeatInterval;
 
 
   /**
    * Set this to stop the thread.
    */
-  private Boolean shutdown = false;
-  private final Object shutdown_lock = new Object();
+  private volatile boolean shutdown = false;
+  private final Object shutdownLock = new Object();
 
 
   /**
@@ -136,11 +137,11 @@
             TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
           }
 
-          synchronized (shutdown_lock)
+          synchronized (shutdownLock)
           {
             if (!shutdown)
             {
-              shutdown_lock.wait(sleepTime);
+              shutdownLock.wait(sleepTime);
             }
           }
         }
@@ -174,10 +175,10 @@
    */
   public void shutdown()
   {
-    synchronized (shutdown_lock)
+    synchronized (shutdownLock)
     {
       shutdown = true;
-      shutdown_lock.notifyAll();
+      shutdownLock.notifyAll();
       if (debugEnabled())
       {
         TRACER.debugInfo("Going to notify Heartbeat thread.");
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 18cf6d3..9e2a936 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.protocol;
 
@@ -59,7 +60,7 @@
   /**
    * The time the last message published to this session.
    */
-  private long lastPublishTime = 0;
+  private volatile long lastPublishTime = 0;
 
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index ff6ea07..adf16b2 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.protocol;
 
@@ -62,7 +63,7 @@
   /**
    * The time the last message published to this session.
    */
-  private long lastPublishTime = 0;
+  private volatile long lastPublishTime = 0;
 
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index c26fd77..2fdf43b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,16 +23,14 @@
  *
  *
  *      Copyright 2007-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.protocol;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.replication.common.AssuredMode;
@@ -60,9 +58,9 @@
 public class TopologyMsg extends ReplicationMsg
 {
   // Information for the DS known in the topology
-  private List<DSInfo> dsList = new ArrayList<DSInfo>();
+  private final List<DSInfo> dsList;
   // Information for the RS known in the topology
-  private List<RSInfo> rsList = new ArrayList<RSInfo>();
+  private final List<RSInfo> rsList;
 
   /**
    * Creates a new changelogInfo message from its encoded form.
@@ -74,7 +72,168 @@
    */
   public TopologyMsg(byte[] in, short version) throws DataFormatException
   {
-    decode(in, version);
+    try
+    {
+      /* First byte is the type */
+      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
+      {
+        throw new DataFormatException(
+          "Input is not a valid " + this.getClass().getCanonicalName());
+      }
+
+      int pos = 1;
+
+      /* Read number of following DS info entries */
+
+      byte nDsInfo = in[pos++];
+
+      /* Read the DS info entries */
+      List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
+      while ( (nDsInfo > 0) && (pos < in.length) )
+      {
+        /* Read DS id */
+        int length = getNextLength(in, pos);
+        String serverIdString = new String(in, pos, length, "UTF-8");
+        int dsId = Integer.valueOf(serverIdString);
+        pos += length + 1;
+
+        /* Read RS id */
+        length =
+          getNextLength(in, pos);
+        serverIdString =
+          new String(in, pos, length, "UTF-8");
+        int rsId = Integer.valueOf(serverIdString);
+        pos += length + 1;
+
+        /* Read the generation id */
+        length = getNextLength(in, pos);
+        long generationId =
+          Long.valueOf(new String(in, pos, length,
+          "UTF-8"));
+        pos += length + 1;
+
+        /* Read DS status */
+        ServerStatus status = ServerStatus.valueOf(in[pos++]);
+
+        /* Read DS assured flag */
+        boolean assuredFlag;
+        if (in[pos++] == 1)
+        {
+          assuredFlag = true;
+        } else
+        {
+          assuredFlag = false;
+        }
+
+        /* Read DS assured mode */
+        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
+
+        /* Read DS safe data level */
+        byte safeDataLevel = in[pos++];
+
+        /* Read DS group id */
+        byte groupId = in[pos++];
+
+        /* Read number of referrals URLs */
+        List<String> refUrls = new ArrayList<String>();
+        byte nUrls = in[pos++];
+        byte nRead = 0;
+        /* Read urls until expected number read */
+        while ((nRead != nUrls) &&
+          (pos < in.length) //security
+          )
+        {
+          length = getNextLength(in, pos);
+          String url = new String(in, pos, length, "UTF-8");
+          refUrls.add(url);
+          pos += length + 1;
+          nRead++;
+        }
+
+        Set<String> attrs = new HashSet<String>();
+        short protocolVersion = -1;
+        if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          byte nAttrs = in[pos++];
+          nRead = 0;
+          /* Read attrs until expected number read */
+          while ((nRead != nAttrs) &&
+            (pos < in.length) //security
+            )
+          {
+            length = getNextLength(in, pos);
+            String attr = new String(in, pos, length, "UTF-8");
+            attrs.add(attr);
+            pos += length + 1;
+            nRead++;
+          }
+          /* Read Protocol version */
+          protocolVersion = Short.valueOf(in[pos++]);
+        }
+
+        /* Now create DSInfo and store it in list */
+
+        DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
+          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
+          protocolVersion);
+        dsList.add(dsInfo);
+
+        nDsInfo--;
+      }
+
+      /* Read number of following RS info entries */
+
+      byte nRsInfo = in[pos++];
+
+      /* Read the RS info entries */
+      List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+      while ( (nRsInfo > 0) && (pos < in.length) )
+      {
+        /* Read RS id */
+        int length = getNextLength(in, pos);
+        String serverIdString = new String(in, pos, length, "UTF-8");
+        int id = Integer.valueOf(serverIdString);
+        pos += length + 1;
+
+        /* Read the generation id */
+        length = getNextLength(in, pos);
+        long generationId =
+          Long.valueOf(new String(in, pos, length,
+          "UTF-8"));
+        pos += length + 1;
+
+        /* Read RS group id */
+        byte groupId = in[pos++];
+
+        int weight = 1;
+        String serverUrl = null;
+        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          length = getNextLength(in, pos);
+          serverUrl = new String(in, pos, length, "UTF-8");
+          pos += length + 1;
+
+          /* Read RS weight */
+          length = getNextLength(in, pos);
+          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+          pos += length + 1;
+        }
+
+        /* Now create RSInfo and store it in list */
+
+        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
+          weight);
+        rsList.add(rsInfo);
+
+        nRsInfo--;
+      }
+
+      this.dsList = Collections.unmodifiableList(dsList);
+      this.rsList = Collections.unmodifiableList(rsList);
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
   }
 
   /**
@@ -85,10 +244,23 @@
    */
   public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
   {
-    if (dsList != null) // null means no info, let empty list from init time
-      this.dsList = dsList;
-    if (rsList != null) // null means no info, let empty list from init time
-      this.rsList = rsList;
+    if (dsList == null || dsList.isEmpty())
+    {
+      this.dsList = Collections.emptyList();
+    }
+    else
+    {
+      this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+    }
+
+    if (rsList == null || rsList.isEmpty())
+    {
+      this.rsList = Collections.emptyList();
+    }
+    else
+    {
+      this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+    }
   }
 
   // ============
@@ -219,172 +391,7 @@
 
   }
 
-  // ============
-  // Msg decoding
-  // ============
 
-  private void decode(byte[] in, short version)
-  throws DataFormatException
-  {
-    try
-    {
-      /* First byte is the type */
-      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
-      {
-        throw new DataFormatException(
-          "Input is not a valid " + this.getClass().getCanonicalName());
-      }
-
-      int pos = 1;
-
-      /* Read number of following DS info entries */
-
-      byte nDsInfo = in[pos++];
-
-      /* Read the DS info entries */
-      while ( (nDsInfo > 0) && (pos < in.length) )
-      {
-        /* Read DS id */
-        int length = getNextLength(in, pos);
-        String serverIdString = new String(in, pos, length, "UTF-8");
-        int dsId = Integer.valueOf(serverIdString);
-        pos += length + 1;
-
-        /* Read RS id */
-        length =
-          getNextLength(in, pos);
-        serverIdString =
-          new String(in, pos, length, "UTF-8");
-        int rsId = Integer.valueOf(serverIdString);
-        pos += length + 1;
-
-        /* Read the generation id */
-        length = getNextLength(in, pos);
-        long generationId =
-          Long.valueOf(new String(in, pos, length,
-          "UTF-8"));
-        pos += length + 1;
-
-        /* Read DS status */
-        ServerStatus status = ServerStatus.valueOf(in[pos++]);
-
-        /* Read DS assured flag */
-        boolean assuredFlag;
-        if (in[pos++] == 1)
-        {
-          assuredFlag = true;
-        } else
-        {
-          assuredFlag = false;
-        }
-
-        /* Read DS assured mode */
-        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
-
-        /* Read DS safe data level */
-        byte safeDataLevel = in[pos++];
-
-        /* Read DS group id */
-        byte groupId = in[pos++];
-
-        /* Read number of referrals URLs */
-        List<String> refUrls = new ArrayList<String>();
-        byte nUrls = in[pos++];
-        byte nRead = 0;
-        /* Read urls until expected number read */
-        while ((nRead != nUrls) &&
-          (pos < in.length) //security
-          )
-        {
-          length = getNextLength(in, pos);
-          String url = new String(in, pos, length, "UTF-8");
-          refUrls.add(url);
-          pos += length + 1;
-          nRead++;
-        }
-
-        Set<String> attrs = new HashSet<String>();
-        short protocolVersion = -1;
-        if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          byte nAttrs = in[pos++];
-          nRead = 0;
-          /* Read attrs until expected number read */
-          while ((nRead != nAttrs) &&
-            (pos < in.length) //security
-            )
-          {
-            length = getNextLength(in, pos);
-            String attr = new String(in, pos, length, "UTF-8");
-            attrs.add(attr);
-            pos += length + 1;
-            nRead++;
-          }
-          /* Read Protocol version */
-          protocolVersion = Short.valueOf(in[pos++]);
-        }
-
-        /* Now create DSInfo and store it in list */
-
-        DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
-          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
-          protocolVersion);
-        dsList.add(dsInfo);
-
-        nDsInfo--;
-      }
-
-      /* Read number of following RS info entries */
-
-      byte nRsInfo = in[pos++];
-
-      /* Read the RS info entries */
-      while ( (nRsInfo > 0) && (pos < in.length) )
-      {
-        /* Read RS id */
-        int length = getNextLength(in, pos);
-        String serverIdString = new String(in, pos, length, "UTF-8");
-        int id = Integer.valueOf(serverIdString);
-        pos += length + 1;
-
-        /* Read the generation id */
-        length = getNextLength(in, pos);
-        long generationId =
-          Long.valueOf(new String(in, pos, length,
-          "UTF-8"));
-        pos += length + 1;
-
-        /* Read RS group id */
-        byte groupId = in[pos++];
-
-        int weight = 1;
-        String serverUrl = null;
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          length = getNextLength(in, pos);
-          serverUrl = new String(in, pos, length, "UTF-8");
-          pos += length + 1;
-
-          /* Read RS weight */
-          length = getNextLength(in, pos);
-          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
-          pos += length + 1;
-        }
-
-        /* Now create RSInfo and store it in list */
-
-        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
-          weight);
-        rsList.add(rsInfo);
-
-        nRsInfo--;
-      }
-
-    } catch (UnsupportedEncodingException e)
-    {
-      throw new DataFormatException("UTF-8 is not supported by this jvm.");
-    }
-  }
 
   /**
    * {@inheritDoc}
diff --git a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index d74a43c..862ec67 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -52,22 +53,23 @@
 public class MonitoringPublisher extends DirectoryThread
 {
 
-  private boolean shutdown = false;
+  private volatile boolean shutdown = false;
+
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
   // The domain we send monitoring for
-  private ReplicationServerDomain replicationServerDomain;
+  private final ReplicationServerDomain replicationServerDomain;
 
   // Sleep time (in ms) before sending new monitoring messages.
-  private long period = 3000;
+  private volatile long period;
 
   // Is the thread terminated ?
-  private boolean done = false;
+  private volatile boolean done = false;
 
-  private final Object sleeper = new Object();
+  private final Object shutdownLock = new Object();
 
   /**
    * Create a monitoring publisher.
@@ -104,9 +106,12 @@
       {
         try
         {
-          synchronized (sleeper)
+          synchronized (shutdownLock)
           {
-            sleeper.wait(period);
+            if (!shutdown)
+            {
+              shutdownLock.wait(period);
+            }
           }
         } catch (InterruptedException ex)
         {
@@ -157,16 +162,17 @@
    */
   public void shutdown()
   {
-    if (debugEnabled())
+    synchronized (shutdownLock)
     {
-      TRACER.debugInfo("Shutting down monitoring publisher for dn " +
-        replicationServerDomain.getBaseDn().toString() + " in RS " +
-        replicationServerDomain.getReplicationServer().getServerId());
-    }
-    shutdown = true;
-    synchronized (sleeper)
-    {
-      sleeper.notify();
+      shutdown = true;
+      shutdownLock.notifyAll();
+
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Shutting down monitoring publisher for dn " +
+          replicationServerDomain.getBaseDn().toString() + " in RS " +
+          replicationServerDomain.getReplicationServer().getServerId());
+      }
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 86e27fb..8c5ffb3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index cc9d667..64976d2 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 import org.opends.messages.*;
@@ -255,13 +256,10 @@
 
         status = cursor.getNext(key, data, LockMode.DEFAULT);
       }
-      cursor.close();
-
     }
-    catch (DatabaseException dbe)
+    finally
     {
       cursor.close();
-      throw dbe;
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
index b7ae03d..4f8447c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -38,7 +39,7 @@
   /**
    * The Replication Server that created this thread.
    */
-  private ReplicationServer server;
+  private final ReplicationServer server;
 
   /**
    * Creates a new instance of this directory thread with the
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
index 8a3cf1e..e2e6ccf 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -38,7 +39,7 @@
   /**
    * The Replication Server that created this thread.
    */
-  private ReplicationServer server;
+  private final ReplicationServer server;
 
   /**
    * Creates a new instance of this directory thread with the
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 0e0d14c..4cefcf4 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -58,8 +59,8 @@
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-  private ProtocolSession session;
-  private ServerHandler handler;
+  private final ProtocolSession session;
+  private final ServerHandler handler;
 
   /**
    * Constructor for the LDAP server reader part of the replicationServer.
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 1bdce8c..8632baa 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 import org.opends.messages.Message;
@@ -57,10 +58,10 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private ProtocolSession session;
-  private ServerHandler handler;
-  private ReplicationServerDomain replicationServerDomain;
-  private short protocolVersion = -1;
+  private final ProtocolSession session;
+  private final ServerHandler handler;
+  private final ReplicationServerDomain replicationServerDomain;
+  private final short protocolVersion;
 
   /**
    * Create a ServerWriter.
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index a48d9ff..f7a5705 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008-2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -47,21 +48,22 @@
 public class StatusAnalyzer extends DirectoryThread
 {
 
-  private boolean finished = false;
+  private volatile boolean shutdown = false;
+
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private ReplicationServerDomain replicationServerDomain;
-  private int degradedStatusThreshold = -1;
+  private final ReplicationServerDomain replicationServerDomain;
+  private volatile int degradedStatusThreshold = -1;
 
   // Sleep time for the thread, in ms.
-  private int STATUS_ANALYZER_SLEEP_TIME = 5000;
+  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
 
-  private boolean done = false;
+  private volatile boolean done = false;
 
-  private Object sleeper = new Object();
+  private final Object shutdownLock = new Object();
 
   /**
    * Create a StatusAnalyzer.
@@ -95,13 +97,16 @@
     }
 
     boolean interrupted = false;
-    while (!finished && !interrupted)
+    while (!shutdown && !interrupted)
     {
       try
       {
-        synchronized (sleeper)
+        synchronized (shutdownLock)
         {
-          sleeper.wait(STATUS_ANALYZER_SLEEP_TIME);
+          if (!shutdown)
+          {
+            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+          }
         }
       } catch (InterruptedException ex)
       {
@@ -192,16 +197,17 @@
    */
   public void shutdown()
   {
-    if (debugEnabled())
+    synchronized (shutdownLock)
     {
-      TRACER.debugInfo("Shutting down status analyzer for dn " +
-        replicationServerDomain.getBaseDn().toString() + " in RS " +
-        replicationServerDomain.getReplicationServer().getServerId());
-    }
-    finished = true;
-    synchronized (sleeper)
-    {
-      sleeper.notify();
+      shutdown = true;
+      shutdownLock.notifyAll();
+
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("Shutting down status analyzer for dn "
+            + replicationServerDomain.getBaseDn().toString() + " in RS "
+            + replicationServerDomain.getReplicationServer().getServerId());
+      }
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index e301b53..597a6d3 100644
--- a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 
 package org.opends.server.replication.service;
@@ -51,26 +52,21 @@
   private static final DebugTracer TRACER = getTracer();
 
   /**
-   * For test purposes only to simulate loss of heartbeats.
-   */
-  static private boolean heartbeatsDisabled = false;
-
-  /**
    * The session on which heartbeats are to be sent.
    */
-  private ProtocolSession session;
+  private final ProtocolSession session;
 
   /**
    * The time in milliseconds between heartbeats.
    */
-  private long heartbeatInterval;
-  private int serverId;
+  private final long heartbeatInterval;
+  private final int serverId;
 
   /**
    * Set this to stop the thread.
    */
-  private Boolean shutdown = false;
-  private final Object shutdown_lock = new Object();
+  private volatile boolean shutdown = false;
+  private final Object shutdownLock = new Object();
 
   /**
    * Create a heartbeat thread.
@@ -112,10 +108,7 @@
 
         if (now > session.getLastPublishTime() + heartbeatInterval)
         {
-          if (!heartbeatsDisabled)
-          {
-            session.publish(ctHeartbeatMsg);
-          }
+          session.publish(ctHeartbeatMsg);
         }
 
         try
@@ -127,11 +120,11 @@
             sleepTime = heartbeatInterval;
           }
 
-          synchronized (shutdown_lock)
+          synchronized (shutdownLock)
           {
             if (!shutdown)
             {
-              shutdown_lock.wait(sleepTime);
+              shutdownLock.wait(sleepTime);
             }
           }
         }
@@ -166,20 +159,10 @@
    */
   public void shutdown()
   {
-    synchronized (shutdown_lock)
+    synchronized (shutdownLock)
     {
       shutdown = true;
-      shutdown_lock.notifyAll();
+      shutdownLock.notifyAll();
     }
   }
-
-
-  /**
-   * For testing purposes only to simulate loss of heartbeats.
-   * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
-   */
-  public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
-  {
-    CTHeartbeatPublisherThread.heartbeatsDisabled = heartbeatsDisabled;
-  }
 }
diff --git a/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
index dd48f8a..dab9fcf 100644
--- a/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.service;
 import org.opends.messages.Message;
@@ -48,9 +49,9 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private ReplicationDomain repDomain;
-  private boolean shutdown = false;
-  private boolean done = false;
+  private final ReplicationDomain repDomain;
+  private volatile boolean shutdown = false;
+  private volatile boolean done = false;
 
 
   /**
@@ -95,11 +96,17 @@
         while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
         {
           if (repDomain.processUpdate(updateMsg) == true)
+          {
             repDomain.processUpdateDoneSynchronous(updateMsg);
+          }
         }
+
         if (updateMsg == null)
+        {
           shutdown = true;
-      } catch (Exception e)
+        }
+      }
+      catch (Exception e)
       {
         /*
          * catch all exceptions happening in repDomain.receive so that the
@@ -119,6 +126,8 @@
     }
   }
 
+
+
   /**
    * Wait for the completion of this thread.
    */
@@ -134,12 +143,13 @@
         n++;
         if (n >= FACTOR)
         {
-          TRACER.debugInfo("Interrupting listener thread for dn " +
-            repDomain.getServiceID() + " in DS " + repDomain.getServerId());
+          TRACER.debugInfo("Interrupting listener thread for dn "
+              + repDomain.getServiceID() + " in DS " + repDomain.getServerId());
           this.interrupt();
         }
       }
-    } catch (InterruptedException e)
+    }
+    catch (InterruptedException e)
     {
       // exit the loop if this thread is interrupted.
     }
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d8798bf..cec9b3d 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -51,6 +51,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -97,11 +98,11 @@
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-  private boolean shutdown = false;
-  private Collection<String> servers;
-  private boolean connected = false;
-  private String replicationServer = "Not connected";
-  private ProtocolSession session = null;
+  private volatile boolean shutdown = false;
+  private volatile Collection<String> servers;
+  private volatile boolean connected = false;
+  private volatile String replicationServer = "Not connected";
+  private volatile ProtocolSession session = null;
   private final ServerState state;
   private final String baseDn;
   private final int serverId;
@@ -156,7 +157,7 @@
    * and to know that it is necessary to print a new message when the broker
    * finally succeed to connect.
    */
-  private boolean connectionError = false;
+  private volatile boolean connectionError = false;
   private final Object connectPhaseLock = new Object();
   /**
    * The thread that publishes messages to the RS containing the current
@@ -173,18 +174,20 @@
    */
   // Info for other DSs.
   // Warning: does not contain info for us (for our server id)
-  private List<DSInfo> dsList = new ArrayList<DSInfo>();
-  private long generationID;
-  private int updateDoneCount = 0;
-  private boolean connectRequiresRecovery = false;
+  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
+  private volatile long generationID;
+  private volatile int updateDoneCount = 0;
+  private volatile boolean connectRequiresRecovery = false;
+
   /**
    * The map of replication server info initialized at connection time and
    * regularly updated. This is used to decide to which best suitable
-   * replication server one wants to connect.
-   * Key: replication server id
-   * Value: replication server info for the matching replication server id
+   * replication server one wants to connect. Key: replication server id Value:
+   * replication server info for the matching replication server id
    */
-  private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
+  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
+    = null;
+
   /**
    * This integer defines when the best replication server checking algorithm
    * should be engaged.
@@ -769,7 +772,7 @@
   {
 
     Map<Integer, ReplicationServerInfo> rsInfos =
-      new HashMap<Integer, ReplicationServerInfo>();
+      new ConcurrentHashMap<Integer, ReplicationServerInfo>();
 
     for (String server : servers)
     {
@@ -2535,8 +2538,6 @@
    * called in a single thread or protected by a locking mechanism
    * before being called.
    *
-   * @throws SocketTimeoutException if the timeout set by setSoTimeout
-   *         has expired
    * @param reconnectToTheBestRS Whether broker will automatically switch
    *                             to the best suitable RS.
    * @param reconnectOnFailure   Whether broker will automatically reconnect
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index d549cd7..2c0856a 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -124,8 +125,8 @@
  *   must use the {@link #publish(UpdateMsg)} method.
  * <p>
  *   If the Full Initialization process is needed then implementation
- *   for {@link #importBackend(InputStream)} and
- *   {@link #exportBackend(OutputStream)} must be
+ *   for {@code importBackend(InputStream)} and
+ *   {@code exportBackend(OutputStream)} must be
  *   provided.
  * <p>
  *   Full Initialization of a replica can be triggered by LDAP clients
@@ -1063,19 +1064,27 @@
     private final int serverToInitialize;
     private final int initWindow;
 
+
+
     /**
      * Constructor for the ExportThread.
      *
-     * @param serverToInitialize serverId of server that will receive entries
+     * @param serverToInitialize
+     *          serverId of server that will receive entries
+     * @param initWindow
+     *          The value of the initialization window for flow control between
+     *          the importer and the exporter.
      */
     public ExportThread(int serverToInitialize, int initWindow)
     {
-      super("Export thread from serverId=" + serverID
-          + " to serverId=" + serverToInitialize);
+      super("Export thread from serverId=" + serverID + " to serverId="
+          + serverToInitialize);
       this.serverToInitialize = serverToInitialize;
       this.initWindow = initWindow;
     }
 
+
+
     /**
      * Run method for this class.
      */
@@ -1342,11 +1351,8 @@
    * @return The source as a integer value
    * @throws DirectoryException if the string is not valid
    */
-  public int decodeTarget(String targetString)
-  throws DirectoryException
+  public int decodeTarget(String targetString) throws DirectoryException
   {
-    int  target = 0;
-    Throwable cause;
     if (targetString.equalsIgnoreCase("all"))
     {
       return RoutableMsg.ALL_SERVERS;
@@ -1355,34 +1361,26 @@
     // So should be a serverID
     try
     {
-      target = Integer.decode(targetString);
+      int target = Integer.decode(targetString);
       if (target >= 0)
       {
         // FIXME Could we check now that it is a know server in the domain ?
       }
       return target;
     }
-    catch(Exception e)
+    catch (Exception e)
     {
-      cause = e;
+      ResultCode resultCode = ResultCode.OTHER;
+      Message message = ERR_INVALID_EXPORT_TARGET.get();
+      throw new DirectoryException(resultCode, message, e);
     }
-    ResultCode resultCode = ResultCode.OTHER;
-    Message message = ERR_INVALID_EXPORT_TARGET.get();
-
-    if (cause != null)
-      throw new DirectoryException(
-          resultCode, message, cause);
-    else
-      throw new DirectoryException(
-          resultCode, message);
-
   }
 
   /**
    * Initializes a remote server from this server.
    * <p>
-   * The {@link #exportBackend(OutputStream)} will therefore be called
-   * on this server, and the {@link #importBackend(InputStream)}
+   * The {@code exportBackend(OutputStream)} will therefore be called
+   * on this server, and the {@code importBackend(InputStream)}
    * will be called on the remote server.
    * <p>
    * The InputStream and OutpuStream given as a parameter to those
@@ -2138,8 +2136,8 @@
    * When this method is called, a request for initialization will
    * be sent to the source server asking for initialization.
    * <p>
-   * The {@link #exportBackend(OutputStream)} will therefore be called
-   * on the source server, and the {@link #importBackend(InputStream)}
+   * The {@code exportBackend(OutputStream)} will therefore be called
+   * on the source server, and the {@code importBackend(InputStream)}
    * will be called on his server.
    * <p>
    * The InputStream and OutpuStream given as a parameter to those
@@ -2161,8 +2159,8 @@
   /**
    * Initializes a remote server from this server.
    * <p>
-   * The {@link #exportBackend(OutputStream)} will therefore be called
-   * on this server, and the {@link #importBackend(InputStream)}
+   * The {@code exportBackend(OutputStream)} will therefore be called
+   * on this server, and the {@code importBackend(InputStream)}
    * will be called on the remote server.
    * <p>
    * The InputStream and OutpuStream given as a parameter to those

--
Gitblit v1.10.0