From f288afe3e9990b3dc060299083a8a3ba307ae2eb Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:08:01 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR

---
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java       |  268 ++++++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java                                        |  252 +++----
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |   72 -
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java                                  |    2 
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties                                                           |    2 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java                                               |   13 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java       |   69 +-
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties                                                           |    3 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java  |   87 +-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |  532 ++++++++++++-----
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java             |   16 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java                                               |   41 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java                             |    6 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java    |    8 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java     |    4 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java                 |    2 
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties                                                           |    2 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |  355 +++++------
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java    |    2 
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties                                                              |    2 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java             |    5 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java        |    2 
 22 files changed, 1,077 insertions(+), 668 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 9f3583f..f2d3447 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -347,7 +347,7 @@
  Bad msg id sequence during import. Expected:%s Actual:%s
 ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\
  The following servers did not acknowledge initialization in the expected \
- time. They are potentially down or too slow. Servers list: %s
+ time for domain %s. They are potentially down or too slow. Servers list: %s
 ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\
  The following servers did not end initialization being connected with the \
  right generation (%s). They are potentially stopped or too slow. \
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties
index d7fa9b5..b82de97 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties
@@ -123,6 +123,7 @@
 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND_110=Die Konfiguration des Replikationsservers konnte nicht gefunden werden
 DEBUG_GOING_TO_SEARCH_FOR_CHANGES_111=Der Replikationsserver ist hinsichtlich unserer \u00c4nderungen versp\u00e4tet: fehlende \u00c4nderungen werden gesendet
 DEBUG_CHANGES_SENT_113=Alle fehlenden \u00c4nderungen wurden an den Replikationsserver gesendet
+<<<<<<< .working
 ERR_PUBLISHING_FAKE_OPS_114=Aufgefangene Ausnahme ver\u00f6ffentlicht Scheinvorg\u00e4nge f\u00fcr Dom\u00e4ne %s : %s
 ERR_COMPUTING_FAKE_OPS_115=Aufgefangene Ausnahme berechnet Scheinvorg\u00e4nge f\u00fcr Dom\u00e4ne %s f\u00fcr Replikationsserver %s : %s
 NOTE_SERVER_STATE_RECOVERY_117=ServerState-Wiederherstellung f\u00fcr Dom\u00e4ne %s, aktualisiert mit changeNumber %s
@@ -182,7 +183,7 @@
 ERR_INIT_IMPORT_FAILURE_190=W\u00e4hrend der Initialisierung von einem Remote-Server ist der folgende Fehler aufgetreten: %s
 ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Verbindungsfehler mit Replikationsserver %s w\u00e4hrend Import
 ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Ung\u00fcltige Meldungs-ID-Sequenz w\u00e4hrend Import. Erwartet: %s Tats\u00e4chlich: %s
-ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s
+ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt f\u00fcr Dom\u00e4ne %s. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s
 ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Die folgenden Server haben die Initialisierung nicht in Verbindung mit der entsprechenden Generation (%s) beendet. Sie wurden gestoppt oder sind zu langsam. Server-Liste: %s
 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Verbindung zu Replikationsserver mit Server-ID=%s w\u00e4hrend Initialisierung von Remote-Server(n) unterbrochen
 ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Initialisierter Server mit Server-ID=%s bei Initialisierung von Remote-Server(n) gestoppt oder zu langsam
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties
index c8c56cf..ee5506d 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties
@@ -182,7 +182,7 @@
 ERR_INIT_IMPORT_FAILURE_190=Durante la inicializaci\u00f3n desde un servidor remoto, se ha producido el siguiente error: %s
 ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Error de conexi\u00f3n con el Servidor de repetici\u00f3n %s durante la importaci\u00f3n
 ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Secuencia de Id. de mensaje incorrecto durante la importaci\u00f3n. Se requiere:%s Real:%s
-ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s
+ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado para el ND de base %s. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s
 ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Los siguientes servidores no finalizaron la inicializaci\u00f3n estando conectados con la generaci\u00f3n correcta (%s). Posiblemente est\u00e1n detenidos o son demasiado lentos. Lista de servidores: %s
 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Al inicializar los servidores remotos, se ha perdido la conexi\u00f3n con el Servidor de repetici\u00f3n con Id. de servidor=%s
 ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Al inicializar los servidores remotos, el servidor inicializado con Id. de servidor=%s posiblemente se ha detenido o es demasiado lento
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties
index c3f8c6e..147fba0 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties
@@ -182,7 +182,7 @@
 ERR_INIT_IMPORT_FAILURE_190=L'erreur suivante s'est produite lors de l'initialisation \u00e0 partir d'un serveur distant : %s
 ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\u00c9chec de la connexion au serveur de r\u00e9plication %s lors de l'importation
 ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Mauvaise s\u00e9quence d'ID de message lors de l'importation. Attendu\u00a0: %s Re\u00e7u\u00a0: %s
-ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s
+ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu pour le domaine %s. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s
 ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Les serveurs suivants n'ont pas termin\u00e9 l'initialisation en cours de connexion avec la bonne g\u00e9n\u00e9ration (%s). Ils sont probablement arr\u00eat\u00e9s ou trop lents. Liste des serveurs\u00a0: %s
 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=La connexion au serveur de r\u00e9plication ayant l'identifiant serverId=%s a \u00e9t\u00e9 interrompue lors de l'initalisation du/des serveur(s) distant(s)
 ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Le serveur initialis\u00e9 ayant l'identifiant serverId=%s \u00e9tait probablement arr\u00eat\u00e9 ou trop lent lors de l'initialisation du/des serveur(s) distant(s)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
index fb42a43..bb062fa 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
@@ -283,9 +283,9 @@
         && equals(eclIncludesForDeletes, dsInfo.getEclIncludesForDeletes());
   }
 
-  private boolean equals(Set<String> o1, Set<String> o2)
+  private boolean equals(Object o1, Object o2)
   {
-    return (o1 == null && o2 == null) || (o1 != null && o1.equals(o2));
+    return o1 == null ? o2 == null : o1.equals(o2);
   }
 
   /**
@@ -320,15 +320,18 @@
   @Override
   public String toString()
   {
-    StringBuilder sb = new StringBuilder();
+    final StringBuilder sb = new StringBuilder();
     sb.append("DS id: ").append(dsId);
     sb.append(" ; DS url: ").append(dsUrl);
     sb.append(" ; RS id: ").append(rsId);
     sb.append(" ; Generation id: ").append(generationId);
     sb.append(" ; Status: ").append(status);
     sb.append(" ; Assured replication: ").append(assuredFlag);
-    sb.append(" ; Assured mode: ").append(assuredMode);
-    sb.append(" ; Safe data level: ").append(safeDataLevel);
+    if (assuredFlag)
+    {
+      sb.append(" ; Assured mode: ").append(assuredMode);
+      sb.append(" ; Safe data level: ").append(safeDataLevel);
+    }
     sb.append(" ; Group id: ").append(groupId);
     sb.append(" ; Protocol version: ").append(protocolVersion);
     sb.append(" ; Referral URLs: ").append(refUrls);
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
index c1aeea7..a101dd6 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2012-2013 ForgeRock AS
+ *      Portions Copyright 2012-2014 ForgeRock AS
  */
 package org.opends.server.replication.common;
 
@@ -36,7 +36,7 @@
 public final class RSInfo
 {
   /** Server id of the RS. */
-  private final int id;
+  private final int rsServerId;
   /** Generation Id of the RS. */
   private final long generationId;
   /** Group id of the RS. */
@@ -50,22 +50,22 @@
    */
   private final int weight;
   /** The server URL of the RS. */
-  private final String serverUrl;
+  private final String rsServerURL;
 
   /**
    * Creates a new instance of RSInfo with every given info.
    *
-   * @param id The RS id
-   * @param serverUrl Url of the RS
+   * @param rsServerId The RS id
+   * @param rsServerURL Url of the RS
    * @param generationId The generation id the RS is using
    * @param groupId RS group id
    * @param weight RS weight
    */
-  public RSInfo(int id, String serverUrl,
+  public RSInfo(int rsServerId, String rsServerURL,
     long generationId, byte groupId, int weight)
   {
-    this.id = id;
-    this.serverUrl = serverUrl;
+    this.rsServerId = rsServerId;
+    this.rsServerURL = rsServerURL;
     this.generationId = generationId;
     this.groupId = groupId;
     this.weight = weight;
@@ -77,7 +77,7 @@
    */
   public int getId()
   {
-    return id;
+    return rsServerId;
   }
 
   /**
@@ -125,12 +125,10 @@
       return false;
     }
     final RSInfo rsInfo = (RSInfo) obj;
-    return id == rsInfo.getId()
+    return rsServerId == rsInfo.getId()
         && generationId == rsInfo.getGenerationId()
         && groupId == rsInfo.getGroupId()
-        && weight == rsInfo.getWeight()
-        && ((serverUrl == null && rsInfo.getServerUrl() == null)
-            || (serverUrl != null && serverUrl.equals(rsInfo.getServerUrl())));
+        && weight == rsInfo.getWeight();
   }
 
   /**
@@ -141,11 +139,10 @@
   public int hashCode()
   {
     int hash = 7;
-    hash = 17 * hash + this.id;
+    hash = 17 * hash + this.rsServerId;
     hash = 17 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
     hash = 17 * hash + this.groupId;
     hash = 17 * hash + this.weight;
-    hash = 17 * hash + (this.serverUrl != null ? this.serverUrl.hashCode() : 0);
     return hash;
   }
 
@@ -155,7 +152,7 @@
    */
   public String getServerUrl()
   {
-    return serverUrl;
+    return rsServerURL;
   }
 
   /**
@@ -165,12 +162,10 @@
   @Override
   public String toString()
   {
-    StringBuilder sb = new StringBuilder();
-    sb.append("Id: ").append(id);
-    sb.append(" ; Server URL: ").append(serverUrl);
-    sb.append(" ; Generation id: ").append(generationId);
-    sb.append(" ; Group id: ").append(groupId);
-    sb.append(" ; Weight: ").append(weight);
-    return sb.toString();
+    return "RS id: " + rsServerId
+        + " ; RS URL: " + rsServerURL
+        + " ; Generation id: " + generationId
+        + " ; Group id: " + groupId
+        + " ; Weight: " + weight;
   }
 }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index d9e2e47..72b73d7 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -38,15 +38,14 @@
 import org.opends.server.replication.common.ServerStatus;
 
 /**
- *
  * This class defines a message that is sent:
  * - By a RS to the other RSs in the topology, containing:
- *   - the list of DSs directly connected to the RS in the DS list
- *   - only this RS in the RS list
+ *   - the DSs directly connected to the RS in the DS infos
+ *   - only this RS in the RS infos
  * - By a RS to his connected DSs, containing every DSs and RSs he knows.
  * In that case the message contains:
- *   - the list of every DS the RS knows except the destinator DS in the DS list
- *   - the list of every connected RSs (including the sending RS) in the RS list
+ *   - every DSs the RS knows except the destinator DS in the DS infos
+ *   - every connected RSs (including the sending RS) in the RS infos
  *
  * Exchanging these messages allows to have each RS or DS take
  * appropriate decisions according to the current topology:
@@ -56,10 +55,10 @@
  */
 public class TopologyMsg extends ReplicationMsg
 {
-  // Information for the DS known in the topology
-  private final List<DSInfo> dsList;
-  // Information for the RS known in the topology
-  private final List<RSInfo> rsList;
+  /** Information for the DSs (aka replicas) known in the topology. */
+  private final Map<Integer, DSInfo> replicaInfos;
+  /** Information for the RSs known in the topology. */
+  private final List<RSInfo> rsInfos;
 
   /**
    * Creates a new changelogInfo message from its encoded form.
@@ -77,18 +76,18 @@
       if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
       {
         throw new DataFormatException(
-          "Input is not a valid " + this.getClass().getCanonicalName());
+          "Input is not a valid " + getClass().getCanonicalName());
       }
 
       int pos = 1;
 
       /* Read number of following DS info entries */
-
       byte nDsInfo = in[pos++];
 
       /* Read the DS info entries */
-      List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
-      while ( (nDsInfo > 0) && (pos < in.length) )
+      Map<Integer, DSInfo> replicaInfos =
+          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+      while (nDsInfo > 0 && pos < in.length)
       {
         /* Read DS id */
         int length = getNextLength(in, pos);
@@ -110,26 +109,21 @@
         }
 
         /* Read RS id */
-        length =
-          getNextLength(in, pos);
-        serverIdString =
-          new String(in, pos, length, "UTF-8");
+        length = getNextLength(in, pos);
+        serverIdString = new String(in, pos, length, "UTF-8");
         int rsId = Integer.valueOf(serverIdString);
         pos += length + 1;
 
         /* Read the generation id */
         length = getNextLength(in, pos);
-        long generationId =
-          Long.valueOf(new String(in, pos, length,
-          "UTF-8"));
+        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
         pos += length + 1;
 
         /* Read DS status */
         ServerStatus status = ServerStatus.valueOf(in[pos++]);
 
         /* Read DS assured flag */
-        boolean assuredFlag;
-        assuredFlag = in[pos++] == 1;
+        boolean assuredFlag = in[pos++] == 1;
 
         /* Read DS assured mode */
         AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -142,50 +136,18 @@
 
         /* Read number of referrals URLs */
         List<String> refUrls = new ArrayList<String>();
-        byte nUrls = in[pos++];
-        byte nRead = 0;
-        /* Read urls until expected number read */
-        while ((nRead != nUrls) &&
-          (pos < in.length) //security
-          )
-        {
-          length = getNextLength(in, pos);
-          String url = new String(in, pos, length, "UTF-8");
-          refUrls.add(url);
-          pos += length + 1;
-          nRead++;
-        }
+        pos = readStrings(in, pos, refUrls);
 
         Set<String> attrs = new HashSet<String>();
         Set<String> delattrs = new HashSet<String>();
         short protocolVersion = -1;
         if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
-          byte nAttrs = in[pos++];
-          nRead = 0;
-          /* Read attrs until expected number read */
-          while ((nRead != nAttrs) && (pos < in.length))
-          {
-            length = getNextLength(in, pos);
-            String attr = new String(in, pos, length, "UTF-8");
-            attrs.add(attr);
-            pos += length + 1;
-            nRead++;
-          }
+          pos = readStrings(in, pos, attrs);
 
           if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
           {
-            nAttrs = in[pos++];
-            nRead = 0;
-            /* Read attrs until expected number read */
-            while ((nRead != nAttrs) && (pos < in.length))
-            {
-              length = getNextLength(in, pos);
-              String attr = new String(in, pos, length, "UTF-8");
-              delattrs.add(attr);
-              pos += length + 1;
-              nRead++;
-            }
+            pos = readStrings(in, pos, delattrs);
           }
           else
           {
@@ -194,26 +156,23 @@
           }
 
           /* Read Protocol version */
-          protocolVersion = (short)in[pos++];
+          protocolVersion = in[pos++];
         }
 
-        /* Now create DSInfo and store it in list */
-
-        DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status,
-          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
-          delattrs, protocolVersion);
-        dsList.add(dsInfo);
+        /* Now create DSInfo and store it */
+        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
+            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
+            attrs, delattrs, protocolVersion));
 
         nDsInfo--;
       }
 
       /* Read number of following RS info entries */
-
       byte nRsInfo = in[pos++];
 
       /* Read the RS info entries */
-      List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
-      while ( (nRsInfo > 0) && (pos < in.length) )
+      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+      while (nRsInfo > 0 && pos < in.length)
       {
         /* Read RS id */
         int length = getNextLength(in, pos);
@@ -223,9 +182,7 @@
 
         /* Read the generation id */
         length = getNextLength(in, pos);
-        long generationId =
-          Long.valueOf(new String(in, pos, length,
-          "UTF-8"));
+        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
         pos += length + 1;
 
         /* Read RS group id */
@@ -245,47 +202,67 @@
           pos += length + 1;
         }
 
-        /* Now create RSInfo and store it in list */
-
-        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
-          weight);
-        rsList.add(rsInfo);
+        /* Now create RSInfo and store it */
+        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
 
         nRsInfo--;
       }
 
-      this.dsList = Collections.unmodifiableList(dsList);
-      this.rsList = Collections.unmodifiableList(rsList);
-    } catch (UnsupportedEncodingException e)
+      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+      this.rsInfos = Collections.unmodifiableList(rsInfos);
+    }
+    catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
     }
   }
 
-  /**
-   * Creates a new  message from a list of the currently connected servers.
-   *
-   * @param dsList The list of currently connected DS servers ID.
-   * @param rsList The list of currently connected RS servers ID.
-   */
-  public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
+  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
+      throws DataFormatException, UnsupportedEncodingException
   {
-    if (dsList == null || dsList.isEmpty())
+    byte nAttrs = in[pos++];
+    byte nRead = 0;
+    // Read all elements until expected number read
+    while (nRead != nAttrs && pos < in.length)
     {
-      this.dsList = Collections.emptyList();
+      int length = getNextLength(in, pos);
+      outputCol.add(new String(in, pos, length, "UTF-8"));
+      pos += length + 1;
+      nRead++;
+    }
+    return pos;
+  }
+
+  /**
+   * Creates a new  message of the currently connected servers.
+   *
+   * @param dsInfos The collection of currently connected DS servers ID.
+   * @param rsInfos The list of currently connected RS servers ID.
+   */
+  public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
+  {
+    if (dsInfos == null || dsInfos.isEmpty())
+    {
+      this.replicaInfos = Collections.emptyMap();
     }
     else
     {
-      this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+      Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>();
+      for (DSInfo dsInfo : dsInfos)
+      {
+        replicas.put(dsInfo.getDsId(), dsInfo);
+      }
+      this.replicaInfos = Collections.unmodifiableMap(replicas);
     }
 
-    if (rsList == null || rsList.isEmpty())
+    if (rsInfos == null || rsInfos.isEmpty())
     {
-      this.rsList = Collections.emptyList();
+      this.rsInfos = Collections.emptyList();
     }
     else
     {
-      this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+      this.rsInfos =
+          Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
     }
   }
 
@@ -293,12 +270,9 @@
   // Msg encoding
   // ============
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
-  public byte[] getBytes(short version)
-  throws UnsupportedEncodingException
+  public byte[] getBytes(short version) throws UnsupportedEncodingException
   {
     try
     {
@@ -313,10 +287,10 @@
       oStream.write(MSG_TYPE_TOPOLOGY);
 
       // Put number of following DS info entries
-      oStream.write((byte)dsList.size());
+      oStream.write((byte) replicaInfos.size());
 
       // Put DS info
-      for (DSInfo dsInfo : dsList)
+      for (DSInfo dsInfo : replicaInfos.values())
       {
         // Put DS id
         byte[] byteServerId =
@@ -330,8 +304,7 @@
           oStream.write(0);
         }
         // Put RS id
-        byteServerId =
-          String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
+        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
         oStream.write(byteServerId);
         oStream.write(0);
         // Put the generation id
@@ -349,36 +322,16 @@
         // Put DS group id
         oStream.write(dsInfo.getGroupId());
 
-        List<String> refUrls = dsInfo.getRefUrls();
-        // Put number of following URLs as a byte
-        oStream.write(refUrls.size());
-        for (String url : refUrls)
-        {
-          // Write the url and a 0 terminating byte
-          oStream.write(url.getBytes("UTF-8"));
-          oStream.write(0);
-        }
+        writeStrings(oStream, dsInfo.getRefUrls());
 
         if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
           // Put ECL includes
-          Set<String> attrs = dsInfo.getEclIncludes();
-          oStream.write(attrs.size());
-          for (String attr : attrs)
-          {
-            oStream.write(attr.getBytes("UTF-8"));
-            oStream.write(0);
-          }
+          writeStrings(oStream, dsInfo.getEclIncludes());
 
           if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
           {
-            Set<String> delattrs = dsInfo.getEclIncludesForDeletes();
-            oStream.write(delattrs.size());
-            for (String attr : delattrs)
-            {
-              oStream.write(attr.getBytes("UTF-8"));
-              oStream.write(0);
-            }
+            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
           }
 
           oStream.write(dsInfo.getProtocolVersion());
@@ -386,10 +339,10 @@
       }
 
       // Put number of following RS info entries
-      oStream.write((byte)rsList.size());
+      oStream.write((byte) rsInfos.size());
 
       // Put RS info
-      for (RSInfo rsInfo : rsList)
+      for (RSInfo rsInfo : rsInfos)
       {
         // Put RS id
         byte[] byteServerId =
@@ -422,54 +375,65 @@
       // never happens
       throw new RuntimeException(e);
     }
-
   }
 
+  private void writeStrings(ByteArrayOutputStream oStream,
+      Collection<String> col) throws IOException, UnsupportedEncodingException
+  {
+    // Put collection length as a byte
+    oStream.write(col.size());
+    for (String elem : col)
+    {
+      // Write the element and a 0 terminating byte
+      oStream.write(elem.getBytes("UTF-8"));
+      oStream.write(0);
+    }
+  }
 
-
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public String toString()
   {
     String dsStr = "";
-    for (DSInfo dsInfo : dsList)
+    for (DSInfo dsInfo : replicaInfos.values())
     {
-      dsStr += dsInfo.toString() + "\n----------------------------\n";
+      dsStr += dsInfo + "\n----------------------------\n";
     }
 
     String rsStr = "";
-    for (RSInfo rsInfo : rsList)
+    for (RSInfo rsInfo : rsInfos)
     {
-      rsStr += rsInfo.toString() + "\n----------------------------\n";
+      rsStr += rsInfo + "\n----------------------------\n";
     }
 
-    return ("TopologyMsg content: "
+    return "TopologyMsg content:"
       + "\n----------------------------"
       + "\nCONNECTED DS SERVERS:"
       + "\n--------------------\n"
       + dsStr
       + "CONNECTED RS SERVERS:"
       + "\n--------------------\n"
-      + rsStr + (rsStr.equals("") ? "----------------------------\n" : ""));
+      + rsStr
+      + (rsStr.equals("") ? "----------------------------\n" : "");
   }
 
   /**
-   * Get the list of DS info.
-   * @return The list of DS info
+   * Get the DS infos.
+   *
+   * @return The DS infos
    */
-  public List<DSInfo> getDsList()
+  public Map<Integer, DSInfo> getReplicaInfos()
   {
-    return dsList;
+    return replicaInfos;
   }
 
   /**
-   * Get the list of RS info.
-   * @return The list of RS info
+   * Get the RS infos.
+   *
+   * @return The RS infos
    */
-  public List<RSInfo> getRsList()
+  public List<RSInfo> getRsInfos()
   {
-    return rsList;
+    return rsInfos;
   }
 }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 6f0d924..4928623 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -444,7 +444,7 @@
     if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
     {
       // List should only contain RS info for sender
-      RSInfo rsInfo = inTopoMsg.getRsList().get(0);
+      RSInfo rsInfo = inTopoMsg.getRsInfos().get(0);
       weight = rsInfo.getWeight();
     }
 
@@ -579,7 +579,7 @@
   public void processTopoInfoFromRS(TopologyMsg topoMsg)
   {
     // List should only contain RS info for sender
-    final RSInfo rsInfo = topoMsg.getRsList().get(0);
+    final RSInfo rsInfo = topoMsg.getRsInfos().get(0);
     generationId = rsInfo.getGenerationId();
     groupId = rsInfo.getGroupId();
     weight = rsInfo.getWeight();
@@ -589,7 +589,7 @@
       clearRemoteLSHandlers();
 
       // Creates the new structure according to the message received.
-      for (DSInfo dsInfo : topoMsg.getDsList())
+      for (DSInfo dsInfo : topoMsg.getReplicaInfos().values())
       {
         // For each DS connected to the peer RS
         DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId);
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c5f305f..d9e6fca 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -173,11 +173,7 @@
   // @NotNull // for the reference
   private final AtomicReference<ConnectedRS> connectedRS =
       new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
-  /**
-   * Our replication domain.
-   * <p>
-   * Can be null for unit test purpose.
-   */
+  /** Our replication domain. */
   private ReplicationDomain domain;
   /**
    * This object is used as a conditional event to be notified about
@@ -217,25 +213,14 @@
   /*
    * Properties for the last topology info received from the network.
    */
-  /**
-   * Info for other DSs.
-   * <p>
-   * Warning: does not contain info for us (for our server id)
-   */
-  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
-  private volatile long generationID;
+  /** Contains the last known state of the replication topology. */
+  private final AtomicReference<Topology> topology =
+      new AtomicReference<Topology>(new Topology());
+  /** <pre>@GuardedBy("this")</pre>. */
   private volatile int updateDoneCount = 0;
   private volatile boolean connectRequiresRecovery = false;
 
   /**
-   * The map of replication server info initialized at connection time and
-   * regularly updated. This is used to decide to which best suitable
-   * replication server one wants to connect. Key: replication server id Value:
-   * replication server info for the matching replication server id
-   */
-  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
-
-  /**
    * This integer defines when the best replication server checking algorithm
    * should be engaged.
    * Every time a monitoring message (each monitoring publisher period) is
@@ -266,19 +251,16 @@
    * @param state The ServerState that should be used by this broker
    *        when negotiating the session with the replicationServer.
    * @param config The configuration to use.
-   * @param generationId The generationId for the server associated to the
-   * provided serverId and for the domain associated to the provided baseDN.
    * @param replSessionSecurity The session security configuration.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
-      ServerState state, ReplicationDomainCfg config, long generationId,
+      ServerState state, ReplicationDomainCfg config,
       ReplSessionSecurity replSessionSecurity)
   {
     this.domain = replicationDomain;
     this.state = state;
     this.config = config;
     this.replSessionSecurity = replSessionSecurity;
-    this.generationID = generationId;
     this.rcvWindow = getMaxRcvWindow();
     this.halfRcvWindow = rcvWindow / 2;
 
@@ -352,8 +334,7 @@
    */
   private long getGenerationID()
   {
-    generationID = domain.getGenerationID();
-    return generationID;
+    return domain.getGenerationID();
   }
 
   /**
@@ -362,38 +343,7 @@
    */
   public void setGenerationID(long generationID)
   {
-    this.generationID = generationID;
-  }
-
-  /**
-   * Sets the locally configured flag for the passed ReplicationServerInfo
-   * object, analyzing the local configuration.
-   * @param rsInfo the Replication server to check and update
-   */
-  private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
-  {
-    // Determine if the passed ReplicationServerInfo has a URL that is present
-    // in the locally configured replication servers
-    String rsUrl = rsInfo.getServerURL();
-    if (rsUrl == null)
-    {
-      // The ReplicationServerInfo has been generated from a server with
-      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
-      // ignore this server as we do not know how to connect to it
-      rsInfo.setLocallyConfigured(false);
-      return;
-    }
-    for (String serverUrl : getReplicationServerUrls())
-    {
-      if (isSameReplicationServerUrl(serverUrl, rsUrl))
-      {
-        // This RS is locally configured, mark this
-        rsInfo.setLocallyConfigured(true);
-        rsInfo.setServerURL(serverUrl);
-        return;
-      }
-    }
-    rsInfo.setLocallyConfigured(false);
+    domain.setGenerationID(generationID);
   }
 
   /**
@@ -485,7 +435,7 @@
 
       // Unsupported message type: should not happen
       throw new IllegalArgumentException("Unexpected PDU type: "
-          + msg.getClass().getName() + " :\n" + msg);
+          + msg.getClass().getName() + ":\n" + msg);
     }
 
     /**
@@ -733,8 +683,10 @@
     @Override
     public String toString()
     {
-      return "Url:" + getServerURL() + " ServerId:" + getServerId()
-          + " GroupId:" + getGroupId();
+      return "ReplServerInfo Url:" + getServerURL()
+          + " ServerId:" + getServerId()
+          + " GroupId:" + getGroupId()
+          + " connectedDSs:" + connectedDSs;
     }
   }
 
@@ -860,9 +812,11 @@
             + "elect the preferred one");
 
       // Get info from every available replication servers
-      replicationServerInfos = collectReplicationServersInfo();
+      Map<Integer, ReplicationServerInfo> rsInfos =
+          collectReplicationServersInfo();
+      computeNewTopology(toRSInfos(rsInfos));
 
-      if (replicationServerInfos.isEmpty())
+      if (rsInfos.isEmpty())
       {
         setConnectedRS(ConnectedRS.noConnectedRS());
       }
@@ -870,7 +824,7 @@
       {
         // At least one server answered, find the best one.
         RSEvaluations evals = computeBestReplicationServer(true, -1, state,
-            replicationServerInfos, serverId, getGroupId(), getGenerationID());
+            rsInfos, serverId, getGroupId(), getGenerationID());
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (logger.isTraceEnabled())
@@ -886,8 +840,7 @@
           Update replication server info with potentially more up to date
           data (server state for instance may have changed)
           */
-          replicationServerInfos
-              .put(electedRsInfo.getServerId(), electedRsInfo);
+          rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
 
           // Handshake phase 1 exchange went well
 
@@ -935,10 +888,10 @@
           connectionError = true;
           connectPhaseLock.notify();
 
-          if (replicationServerInfos.size() > 0)
+          if (rsInfos.size() > 0)
           {
             logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN.toNormalizedString(),
-                Utils.joinAsString(", ", replicationServerInfos.keySet()));
+                Utils.joinAsString(", ", rsInfos.keySet()));
           }
           else
           {
@@ -949,6 +902,43 @@
     }
   }
 
+  private void computeNewTopology(List<RSInfo> newRSInfos)
+  {
+    final int rsServerId = getRsServerId();
+
+    Topology oldTopo;
+    Topology newTopo;
+    do
+    {
+      oldTopo = topology.get();
+      newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
+          rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
+    }
+    while (!topology.compareAndSet(oldTopo, newTopo));
+
+    if (logger.isTraceEnabled())
+    {
+      debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
+    }
+  }
+
+  private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
+      Topology newTopo)
+  {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("rsServerId=").append(rsServerId);
+    if (newTopo.equals(oldTopo))
+    {
+      sb.append(", unchangedTopology=").append(newTopo);
+    }
+    else
+    {
+      sb.append(", oldTopology=").append(oldTopo);
+      sb.append(", newTopology=").append(newTopo);
+    }
+    return sb;
+  }
+
   /**
    * Connects to a replication server.
    *
@@ -2303,7 +2293,7 @@
     if (logger.isTraceEnabled())
     {
       debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
-          + rs.getServerId() + ") genId=" + generationID);
+          + rs.getServerId() + ") genId=" + getGenerationID());
     }
   }
 
@@ -2408,7 +2398,8 @@
           */
           credit =
             currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
-        } else
+        }
+        else
         {
           credit = true;
         }
@@ -2451,6 +2442,11 @@
       }
       catch (IOException e)
       {
+        if (logger.isTraceEnabled())
+        {
+          debugInfo("publish(): IOException caught: "
+              + stackTraceToSingleLineString(e));
+        }
         if (!retryOnFailure)
         {
           return false;
@@ -2463,23 +2459,24 @@
           try
           {
             connectPhaseLock.wait(100);
-          } catch (InterruptedException e1)
+          }
+          catch (InterruptedException ignored)
           {
-            // ignore
             if (logger.isTraceEnabled())
             {
-              debugInfo("publish(): Interrupted exception raised : "
-                  + e.getLocalizedMessage());
+              debugInfo("publish(): InterruptedException caught 1: "
+                  + stackTraceToSingleLineString(ignored));
             }
           }
         }
-      } catch (InterruptedException e)
+      }
+      catch (InterruptedException ignored)
       {
         // just loop.
         if (logger.isTraceEnabled())
         {
-          debugInfo("publish(): Interrupted exception raised."
-              + e.getLocalizedMessage());
+          debugInfo("publish(): InterruptedException caught 2: "
+              + stackTraceToSingleLineString(ignored));
         }
       }
     }
@@ -2607,9 +2604,10 @@
           }
 
           // Update the replication servers ServerStates with new received info
+          Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
           for (int srvId : toIterable(monitorMsg.rsIterator()))
           {
-            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
+            final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
             if (rsInfo != null)
             {
               rsInfo.update(monitorMsg.getRSServerState(srvId));
@@ -2629,9 +2627,9 @@
             {
               // Stable topology (no topo msg since few seconds): proceed with
               // best server checking.
-              final RSEvaluations evals =
-                  computeBestReplicationServer(false, previousRsServerID, state,
-                  replicationServerInfos, serverId, getGroupId(), generationID);
+              final RSEvaluations evals = computeBestReplicationServer(
+                  false, previousRsServerID, state,
+                  rsInfos, serverId, getGroupId(), getGenerationID());
               final ReplicationServerInfo bestServerInfo = evals.getBestRS();
               if (previousRsServerID != -1
                   && (bestServerInfo == null
@@ -2951,9 +2949,9 @@
    * Gets the info for DSs in the topology (except us).
    * @return The info for DSs in the topology (except us)
    */
-  public List<DSInfo> getDsList()
+  public Map<Integer, DSInfo> getReplicaInfos()
   {
-    return dsList;
+    return topology.get().replicaInfos;
   }
 
   /**
@@ -2962,10 +2960,15 @@
    * @return The info for RSs in the topology (except the one we are connected
    * to)
    */
-  public List<RSInfo> getRsList()
+  public List<RSInfo> getRsInfos()
+  {
+    return toRSInfos(topology.get().rsInfos);
+  }
+
+  private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
   {
     final List<RSInfo> result = new ArrayList<RSInfo>();
-    for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
+    for (ReplicationServerInfo rsInfo : rsInfos.values())
     {
       result.add(rsInfo.toRSInfo());
     }
@@ -2973,39 +2976,6 @@
   }
 
   /**
-   * Computes the list of DSs connected to a particular RS.
-   * @param rsId The RS id of the server one wants to know the connected DSs
-   * @param dsList The list of DSinfo from which to compute things
-   * @param rsServerId the serverId to use for the connectedDS
-   * @return The list of connected DSs to the server rsId
-   */
-  private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
-      int rsServerId)
-  {
-    final Set<Integer> connectedDSs = new HashSet<Integer>();
-    if (rsServerId == rsId)
-    {
-      /*
-      If we are computing connected DSs for the RS we are connected
-      to, we should count the local DS as the DSInfo of the local DS is not
-      sent by the replication server in the topology message. We must count
-      ourselves as a connected server.
-      */
-      connectedDSs.add(getServerId());
-    }
-
-    for (DSInfo dsInfo : dsList)
-    {
-      if (dsInfo.getRsId() == rsId)
-      {
-        connectedDSs.add(dsInfo.getDsId());
-      }
-    }
-
-    return connectedDSs;
-  }
-
-  /**
    * Processes an incoming TopologyMsg.
    * Updates the structures for the local view of the topology.
    *
@@ -3016,42 +2986,298 @@
    */
   private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
   {
-    if (logger.isTraceEnabled())
-      debugInfo("receive TopologyMsg=" + topoMsg);
-
-    // Store new DS list
-    dsList = topoMsg.getDsList();
-
-    // Update replication server info list with the received topology
-    // information
-    final Set<Integer> rssToKeep = new HashSet<Integer>();
-    for (RSInfo rsInfo : topoMsg.getRsList())
+    final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
+    for (DSInfo dsInfo : newTopo.replicaInfos.values())
     {
-      final int rsId = rsInfo.getId();
-      rssToKeep.add(rsId); // Mark this server as still existing
-      Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
-      ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
-      if (rsInfo2 == null)
-      {
-        // New replication server, create info for it add it to the list
-        rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
-        setLocallyConfiguredFlag(rsInfo2);
-        replicationServerInfos.put(rsId, rsInfo2);
-      }
-      else
-      {
-        // Update the existing info for the replication server
-        rsInfo2.update(rsInfo, connectedDSs);
-      }
+      domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
+          .getEclIncludesForDeletes());
+    }
+  }
+
+  private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
+  {
+    Topology oldTopo;
+    Topology newTopo;
+    do
+    {
+      oldTopo = topology.get();
+      newTopo = new Topology(topoMsg, getServerId(), rsServerId,
+              getReplicationServerUrls(), oldTopo.rsInfos);
+    }
+    while (!topology.compareAndSet(oldTopo, newTopo));
+
+    if (logger.isTraceEnabled())
+    {
+      final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
+      sb.append(" received TopologyMsg=").append(topoMsg);
+      debugInfo(sb);
+    }
+    return newTopo;
+  }
+
+  /**
+   * Contains the last known state of the replication topology.
+   */
+  static final class Topology
+  {
+
+    /**
+     * The RS's serverId that this DS was connected to when this topology state
+     * was computed.
+     */
+    private final int rsServerId;
+    /**
+     * Info for other DSs.
+     * <p>
+     * Warning: does not contain info for us (for our server id)
+     */
+    final Map<Integer, DSInfo> replicaInfos;
+    /**
+     * The map of replication server info initialized at connection time and
+     * regularly updated. This is used to decide to which best suitable
+     * replication server one wants to connect. Key: replication server id
+     * Value: replication server info for the matching replication server id
+     */
+    final Map<Integer, ReplicationServerInfo> rsInfos;
+
+    private Topology()
+    {
+      this.rsServerId = -1;
+      this.replicaInfos = Collections.emptyMap();
+      this.rsInfos = Collections.emptyMap();
     }
 
-    // Remove any replication server that may have disappeared from the topology
-    replicationServerInfos.keySet().retainAll(rssToKeep);
-
-    for (DSInfo info : dsList)
+    /**
+     * Constructor to use when only the RSInfos need to be recomputed.
+     *
+     * @param dsInfosToKeep
+     *          the DSInfos that will be stored as is
+     * @param newRSInfos
+     *          the new RSInfos from which to compute the new topology
+     * @param dsServerId
+     *          the DS serverId
+     * @param rsServerId
+     *          the current connected RS serverId
+     * @param configuredReplicationServerUrls
+     *          the configured replication server URLs
+     * @param previousRsInfos
+     *          the RSInfos computed in the previous Topology object
+     */
+    Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
+        int dsServerId, int rsServerId,
+        Set<String> configuredReplicationServerUrls,
+        Map<Integer, ReplicationServerInfo> previousRsInfos)
     {
-      domain.setEclIncludes(info.getDsId(), info.getEclIncludes(),
-          info.getEclIncludesForDeletes());
+      this.rsServerId = rsServerId;
+      this.replicaInfos = dsInfosToKeep;
+      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
+          previousRsInfos, configuredReplicationServerUrls);
+    }
+
+    /**
+     * Constructor to use when a new TopologyMsg has been received.
+     *
+     * @param topoMsg
+     *          the topology message containing the new DSInfos and RSInfos from
+     *          which to compute the new topology
+     * @param dsServerId
+     *          the DS serverId
+     * @param rsServerId
+     *          the current connected RS serverId
+     * @param configuredReplicationServerUrls
+     *          the configured replication server URLs
+     * @param previousRsInfos
+     *          the RSInfos computed in the previous Topology object
+     */
+    Topology(TopologyMsg topoMsg, int dsServerId,
+        int rsServerId, Set<String> configuredReplicationServerUrls,
+        Map<Integer, ReplicationServerInfo> previousRsInfos)
+    {
+      this.rsServerId = rsServerId;
+      this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
+      this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
+          previousRsInfos, configuredReplicationServerUrls);
+    }
+
+    private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
+        int dsServerId)
+    {
+      final Map<Integer, DSInfo> copy = new HashMap<Integer, DSInfo>(dsInfos);
+      copy.remove(dsServerId);
+      return Collections.unmodifiableMap(copy);
+    }
+
+    private Map<Integer, ReplicationServerInfo> computeRSInfos(
+        int dsServerId, List<RSInfo> newRsInfos,
+        Map<Integer, ReplicationServerInfo> previousRsInfos,
+        Set<String> configuredReplicationServerUrls)
+    {
+      final Map<Integer, ReplicationServerInfo> results =
+          new HashMap<Integer, ReplicationServerInfo>(previousRsInfos);
+
+      // Update replication server info list with the received topology info
+      final Set<Integer> rssToKeep = new HashSet<Integer>();
+      for (RSInfo newRSInfo : newRsInfos)
+      {
+        final int rsId = newRSInfo.getId();
+        rssToKeep.add(rsId); // Mark this server as still existing
+        Set<Integer> connectedDSs =
+            computeDSsConnectedTo(rsId, dsServerId);
+        ReplicationServerInfo rsInfo = results.get(rsId);
+        if (rsInfo == null)
+        {
+          // New replication server, create info for it add it to the list
+          rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
+          setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
+          results.put(rsId, rsInfo);
+        }
+        else
+        {
+          // Update the existing info for the replication server
+          rsInfo.update(newRSInfo, connectedDSs);
+        }
+      }
+
+      // Remove any replication server that may have disappeared from the
+      // topology
+      results.keySet().retainAll(rssToKeep);
+
+      return Collections.unmodifiableMap(results);
+    }
+
+    /** Computes the list of DSs connected to a particular RS. */
+    private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
+    {
+      final Set<Integer> connectedDSs = new HashSet<Integer>();
+      if (rsServerId == rsId)
+      {
+        /*
+         * If we are computing connected DSs for the RS we are connected to, we
+         * should count the local DS as the DSInfo of the local DS is not sent
+         * by the replication server in the topology message. We must count
+         * ourselves as a connected server.
+         */
+        connectedDSs.add(dsServerId);
+      }
+
+      for (DSInfo dsInfo : replicaInfos.values())
+      {
+        if (dsInfo.getRsId() == rsId)
+        {
+          connectedDSs.add(dsInfo.getDsId());
+        }
+      }
+
+      return connectedDSs;
+    }
+
+    /**
+     * Sets the locally configured flag for the passed ReplicationServerInfo
+     * object, analyzing the local configuration.
+     *
+     * @param rsInfo
+     *          the Replication server to check and update
+     * @param configuredReplicationServerUrls
+     */
+    private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
+        Set<String> configuredReplicationServerUrls)
+    {
+      // Determine if the passed ReplicationServerInfo has a URL that is present
+      // in the locally configured replication servers
+      String rsUrl = rsInfo.getServerURL();
+      if (rsUrl == null)
+      {
+        // The ReplicationServerInfo has been generated from a server with
+        // no URL in TopologyMsg (i.e: with replication protocol version < 4):
+        // ignore this server as we do not know how to connect to it
+        rsInfo.setLocallyConfigured(false);
+        return;
+      }
+      for (String serverUrl : configuredReplicationServerUrls)
+      {
+        if (isSameReplicationServerUrl(serverUrl, rsUrl))
+        {
+          // This RS is locally configured, mark this
+          rsInfo.setLocallyConfigured(true);
+          rsInfo.setServerURL(serverUrl);
+          return;
+        }
+      }
+      rsInfo.setLocallyConfigured(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj)
+      {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass())
+      {
+        return false;
+      }
+      final Topology other = (Topology) obj;
+      return rsServerId == other.rsServerId
+          && equals(replicaInfos, other.replicaInfos)
+          && equals(rsInfos, other.rsInfos)
+          && urlsEqual1(replicaInfos, other.replicaInfos)
+          && urlsEqual2(rsInfos, other.rsInfos);
+    }
+
+    private boolean equals(Object o1, Object o2)
+    {
+      return o1 == null ? o2 == null : o1.equals(o2);
+    }
+
+    private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
+        Map<Integer, DSInfo> replicaInfos2)
+    {
+      for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
+      {
+        DSInfo dsInfo = replicaInfos2.get(entry.getKey());
+        if (!equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
+        {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
+        Map<Integer, ReplicationServerInfo> rsInfos2)
+    {
+      for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
+      {
+        ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
+        if (!equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
+        {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + rsServerId;
+      result = prime * result
+          + (replicaInfos == null ? 0 : replicaInfos.hashCode());
+      result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
+      return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
+          + ", rsInfos=" + rsInfos.values();
     }
   }
 
@@ -3197,15 +3423,15 @@
       .append(" \"").append(getBaseDN()).append(" ")
       .append(getServerId()).append("\",")
       .append(" groupId=").append(getGroupId())
-      .append(", genId=").append(generationID)
+      .append(", genId=").append(getGenerationID())
       .append(", ");
     connectedRS.get().toString(sb);
     return sb.toString();
   }
 
-  private void debugInfo(String message)
+  private void debugInfo(CharSequence message)
   {
     logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
-        + " and serverId=" + getServerId() + " " + message);
+        + " and serverId=" + getServerId() + ": " + message);
   }
 }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 60a2b4a..1323777 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -95,15 +95,118 @@
  * <p>
  *   Full Initialization of a replica can be triggered by LDAP clients
  *   by creating InitializeTasks or InitializeTargetTask.
- *   Full initialization can also by triggered from the ReplicationDomain
- *   implementation using methods {@link #initializeRemote(int)}
- *   or {@link #initializeFromRemote(int)}.
+ *   Full initialization can also be triggered from the ReplicationDomain
+ *   implementation using methods {@link #initializeRemote(int, Task)}
+ *   or {@link #initializeFromRemote(int, Task)}.
  * <p>
  *   At shutdown time, the {@link #disableService()} method should be called to
  *   cleanly stop the replication service.
  */
 public abstract class ReplicationDomain
 {
+
+  /**
+   * Contains all the attributes included for the ECL (External Changelog).
+   */
+  // @Immutable
+  private final static class ECLIncludes
+  {
+
+    final Map<Integer, Set<String>> includedAttrsByServer;
+    final Set<String> includedAttrsAllServers;
+
+    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
+    final Set<String> includedAttrsForDeletesAllServers;
+
+    private ECLIncludes(
+        Map<Integer, Set<String>> includedAttrsByServer,
+        Set<String> includedAttrsAllServers,
+        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
+        Set<String> includedAttrsForDeletesAllServers)
+    {
+      this.includedAttrsByServer = includedAttrsByServer;
+      this.includedAttrsAllServers = includedAttrsAllServers;
+
+      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
+      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
+    }
+
+    @SuppressWarnings("unchecked")
+    public ECLIncludes()
+    {
+      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
+          Collections.EMPTY_SET);
+    }
+
+    /**
+     * Add attributes to be included in the ECL.
+     *
+     * @param serverId
+     *          Server where these attributes are configured.
+     * @param includeAttributes
+     *          Attributes to be included with all change records, may include
+     *          wild-cards.
+     * @param includeAttributesForDeletes
+     *          Additional attributes to be included with delete change records,
+     *          may include wild-cards.
+     * @return a new {@link ECLIncludes} object if included attributes have
+     *         changed, or the current object otherwise.
+     */
+    public ECLIncludes addIncludedAttributes(int serverId,
+        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
+    {
+      boolean configurationChanged = false;
+
+      Set<String> s1 = new HashSet<String>(includeAttributes);
+
+      // Combine all+delete attributes.
+      Set<String> s2 = new HashSet<String>(s1);
+      s2.addAll(includeAttributesForDeletes);
+
+      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
+      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
+      {
+        configurationChanged = true;
+        eclIncludesByServer = new HashMap<Integer, Set<String>>(
+            this.includedAttrsByServer);
+        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
+      }
+
+      Map<Integer, Set<String>> eclIncludesForDeletesByServer =
+          this.includedAttrsForDeletesByServer;
+      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
+      {
+        configurationChanged = true;
+        eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
+                this.includedAttrsForDeletesByServer);
+        eclIncludesForDeletesByServer.put(
+            serverId, Collections.unmodifiableSet(s2));
+      }
+
+      if (!configurationChanged)
+      {
+        return this;
+      }
+
+      // and rebuild the global list to be ready for usage
+      Set<String> eclIncludesAllServer = new HashSet<String>();
+      for (Set<String> attributes : eclIncludesByServer.values())
+      {
+        eclIncludesAllServer.addAll(attributes);
+      }
+
+      Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
+      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
+      {
+        eclIncludesForDeletesAllServer.addAll(attributes);
+      }
+      return new ECLIncludes(eclIncludesByServer,
+          Collections.unmodifiableSet(eclIncludesAllServer),
+          eclIncludesForDeletesByServer,
+          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
+    }
+  }
+
   /**
    * Current status for this replicated domain.
    */
@@ -251,14 +354,8 @@
    */
   private final CSNGenerator generator;
 
-  private final Object eclIncludesLock = new Object();
-  private final Map<Integer, Set<String>> eclIncludesByServer =
-    new HashMap<Integer, Set<String>>();
-  private Set<String> eclIncludesAllServers = Collections.emptySet();
-
-  private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
-    new HashMap<Integer, Set<String>>();
-  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
+  private final AtomicReference<ECLIncludes> eclIncludes =
+      new AtomicReference<ECLIncludes>(new ECLIncludes());
 
   /**
    * An object used to protect the initialization of the underlying broker
@@ -551,9 +648,9 @@
    * Gets the info for Replicas in the topology (except us).
    * @return The info for Replicas in the topology (except us)
    */
-  public List<DSInfo> getReplicasList()
+  public Map<Integer, DSInfo> getReplicaInfos()
   {
-    return broker.getDsList();
+    return broker.getReplicaInfos();
   }
 
   /**
@@ -562,20 +659,13 @@
    * disconnected. Return null when no server with the provided serverId is
    * connected.
    *
-   * @param  serverId The provided serverId of the remote replica
+   * @param  dsId The provided serverId of the remote replica
    * @return the info related to this remote server if it is connected,
    *                  null is the server is NOT connected.
    */
-  public DSInfo isRemoteDSConnected(int serverId)
+  private DSInfo isRemoteDSConnected(int dsId)
   {
-    for (DSInfo remoteDS : getReplicasList())
-    {
-      if (remoteDS.getDsId() == serverId)
-      {
-        return remoteDS;
-      }
-    }
-    return null;
+    return getReplicaInfos().get(dsId);
   }
 
   /**
@@ -601,9 +691,9 @@
    * @return The info for RSs in the topology (except the one we are connected
    * to)
    */
-  public List<RSInfo> getRsList()
+  public List<RSInfo> getRsInfos()
   {
-    return broker.getRsList();
+    return broker.getRsInfos();
   }
 
 
@@ -1100,7 +1190,7 @@
      *                         for and import, false if the IEContext
      *                         will be used for and export.
      */
-    public IEContext(boolean importInProgress)
+    private IEContext(boolean importInProgress)
     {
       this.importInProgress = importInProgress;
       this.startTime = System.currentTimeMillis();
@@ -1114,7 +1204,7 @@
      * @return A boolean indicating if a total update import is currently in
      *         Progress.
      */
-    public boolean importInProgress()
+    boolean importInProgress()
     {
       return importInProgress;
     }
@@ -1153,18 +1243,17 @@
       entryCount = total;
       entryLeftCount = total;
 
-      if (initializeTask != null)
+      if (initializeTask instanceof InitializeTask)
       {
-        if (initializeTask instanceof InitializeTask)
-        {
-          ((InitializeTask)initializeTask).setTotal(entryCount);
-          ((InitializeTask)initializeTask).setLeft(entryCount);
-        }
-        else if (initializeTask instanceof InitializeTargetTask)
-        {
-          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
-          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
-        }
+        final InitializeTask task = (InitializeTask) initializeTask;
+        task.setTotal(entryCount);
+        task.setLeft(entryCount);
+      }
+      else if (initializeTask instanceof InitializeTargetTask)
+      {
+        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
+        task.setTotal(entryCount);
+        task.setLeft(entryCount);
       }
     }
 
@@ -1177,7 +1266,7 @@
      *
      * @throws DirectoryException if an error occurred.
      */
-    public void updateCounters(int entriesDone) throws DirectoryException
+    private void updateCounters(int entriesDone) throws DirectoryException
     {
       entryLeftCount -= entriesDone;
 
@@ -1198,7 +1287,7 @@
     @Override
     public String toString()
     {
-      return "[ Entry count=" + this.entryCount +
+      return "[Entry count=" + this.entryCount +
              ", Entry left count=" + this.entryLeftCount + "]";
     }
 
@@ -1258,7 +1347,7 @@
      * @param serverId serverId of the acknowledger/receiver/importer server.
      * @param numAck   id of the message received.
      */
-    public void setAckVal(int serverId, int numAck)
+    private void setAckVal(int serverId, int numAck)
     {
       if (logger.isTraceEnabled())
         logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
@@ -1315,6 +1404,7 @@
       if (target >= 0)
       {
         // FIXME Could we check now that it is a know server in the domain ?
+        // JNR: Yes please
       }
       return target;
     }
@@ -1338,7 +1428,7 @@
    *
    * @param target   The server-id of the server that should be initialized.
    *                 The target can be discovered using the
-   *                 {@link #getReplicasList()} method.
+   *                 {@link #getReplicaInfos()} method.
    * @param initTask The task that triggers this initialization and that should
    *                 be updated with its progress.
    *
@@ -1386,13 +1476,10 @@
       logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
           countEntries(), getBaseDNString(), getServerId());
 
-      for (DSInfo dsi : getReplicasList())
-      {
-        ieCtx.startList.add(dsi.getDsId());
-      }
+      ieCtx.startList.addAll(getReplicaInfos().keySet());
 
       // We manage the list of servers with which a flow control can be enabled
-      for (DSInfo dsi : getReplicasList())
+      for (DSInfo dsi : getReplicaInfos().values())
       {
         if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
@@ -1408,7 +1495,7 @@
       ieCtx.startList.add(serverToInitialize);
 
       // We manage the list of servers with which a flow control can be enabled
-      for (DSInfo dsi : getReplicasList())
+      for (DSInfo dsi : getReplicaInfos().values())
       {
         if (dsi.getDsId() == serverToInitialize &&
             dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
@@ -1453,7 +1540,7 @@
         {
           throw new DirectoryException(
               ResultCode.OTHER,
-              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
+              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(),
                   ieCtx.failureList));
         }
 
@@ -1472,7 +1559,7 @@
 
       if (logger.isTraceEnabled())
         logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
-            + " export ends with " + " connected=" + broker.isConnected()
+            + " export ends with connected=" + broker.isConnected()
             + " exportRootException=" + exportRootException);
 
       if (exportRootException != null)
@@ -1592,7 +1679,7 @@
     do
     {
       done = true;
-      for (DSInfo dsi : getReplicasList())
+      for (DSInfo dsi : getReplicaInfos().values())
       {
         if (logger.isTraceEnabled())
           logger.trace(
@@ -1650,10 +1737,7 @@
     considered in the processing of sorting the successfully initialized
     and the others
     */
-    for (DSInfo dsi : getReplicasList())
-    {
-      replicasWeAreWaitingFor.add(dsi.getDsId());
-    }
+    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
 
     boolean done;
     do
@@ -1698,13 +1782,11 @@
             done = false;
             break;
           }
-          else
-          {
-            if (dsInfo.getGenerationId() == getGenerationID())
-            { // and with the expected generationId
-              // We're done with this server
-              it.remove();
-            }
+
+          if (dsInfo.getGenerationId() == getGenerationID())
+          { // and with the expected generationId
+            // We're done with this server
+            it.remove();
           }
         }
       }
@@ -1717,7 +1799,6 @@
           Thread.currentThread().interrupt();
         } // 1sec
       }
-
     }
     while (!done && !broker.shuttingDown()); // infinite wait
 
@@ -1967,8 +2048,8 @@
    *
    * @throws IOException when an error occurred.
    */
-  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
-  throws IOException
+  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
+      throws IOException
   {
     if (logger.isTraceEnabled())
       logger.trace("[IE] Entering exportLDIFEntry entry=" +
@@ -2072,53 +2153,6 @@
   }
 
   /**
-   * Initializes this domain from another source server.
-   * <p>
-   * When this method is called, a request for initialization will
-   * be sent to the source server asking for initialization.
-   * <p>
-   * The {@code exportBackend(OutputStream)} will therefore be called
-   * on the source server, and the {@code importBackend(InputStream)}
-   * will be called on his server.
-   * <p>
-   * The InputStream and OutpuStream given as a parameter to those
-   * methods will be connected through the replication protocol.
-   *
-   * @param source   The server-id of the source from which to initialize.
-   *                 The source can be discovered using the
-   *                 {@link #getReplicasList()} method.
-   *
-   * @throws DirectoryException If it was not possible to publish the
-   *                            Initialization message to the Topology.
-   */
-  public void initializeFromRemote(int source) throws DirectoryException
-  {
-    initializeFromRemote(source, null);
-  }
-
-  /**
-   * Initializes a remote server from this server.
-   * <p>
-   * The {@code exportBackend(OutputStream)} will therefore be called
-   * on this server, and the {@code importBackend(InputStream)}
-   * will be called on the remote server.
-   * <p>
-   * The InputStream and OutpuStream given as a parameter to those
-   * methods will be connected through the replication protocol.
-   *
-   * @param target   The server-id of the server that should be initialized.
-   *                 The target can be discovered using the
-   *                 {@link #getReplicasList()} method.
-   *
-   * @throws DirectoryException If it was not possible to publish the
-   *                            Initialization message to the Topology.
-   */
-  public void initializeRemote(int target) throws DirectoryException
-  {
-    initializeRemote(target, null);
-  }
-
-  /**
    * Initializes asynchronously this domain from a remote source server.
    * Before returning from this call, for the provided task :
    * - the progressing counters are updated during the initialization using
@@ -2131,7 +2165,7 @@
    *
    * @param source   The server-id of the source from which to initialize.
    *                 The source can be discovered using the
-   *                 {@link #getReplicasList()} method.
+   *                 {@link #getReplicaInfos()} method.
    *
    * @param initTask The task that launched the initialization
    *                 and should be updated of its progress.
@@ -2217,7 +2251,7 @@
    *                          task has initially been created (this server,
    *                          or the remote server).
    */
-  void initialize(InitializeTargetMsg initTargetMsgReceived,
+  private void initialize(InitializeTargetMsg initTargetMsgReceived,
       int requesterServerId)
   {
     InitializeTask initFromTask = null;
@@ -2254,7 +2288,6 @@
       ieCtx.importSource = source;
       ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
       ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
-      // Protocol version is -1 when not known.
       ieCtx.exporterProtocolVersion = getProtocolVersion(source);
       initFromTask = (InitializeTask) ieCtx.initializeTask;
 
@@ -2382,18 +2415,14 @@
    * @param dsServerId The provided serverId.
    * @return The protocol version.
    */
-  short getProtocolVersion(int dsServerId)
+  private short getProtocolVersion(int dsServerId)
   {
-    short protocolVersion = -1;
-    for (DSInfo dsi : getReplicasList())
+    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
+    if (dsInfo != null)
     {
-      if (dsi.getDsId() == dsServerId)
-      {
-        protocolVersion = dsi.getProtocolVersion();
-        break;
-      }
+      return dsInfo.getProtocolVersion();
     }
-    return protocolVersion;
+    return -1;
   }
 
   /**
@@ -2459,7 +2488,7 @@
     for (int i = 0; i< 50; i++)
     {
       allSet = true;
-      for (RSInfo rsInfo : getRsList())
+      for (RSInfo rsInfo : getRsInfos())
       {
         // the 'empty' RSes (generationId==-1) are considered as good citizens
         if (rsInfo.getGenerationId() != -1 &&
@@ -2498,7 +2527,7 @@
    *                           connected to a Replication Server or it
    *                           was not possible to contact it.
    */
-  public void resetReplicationLog() throws DirectoryException
+  void resetReplicationLog() throws DirectoryException
   {
     // Reset the Generation ID to -1 to clean the ReplicationServers.
     resetGenerationId(-1L);
@@ -2913,7 +2942,7 @@
       {
         // create the broker object used to publish and receive changes
         broker = new ReplicationBroker(
-            this, state, config, getGenerationID(), new ReplSessionSecurity());
+            this, state, config, new ReplSessionSecurity());
         broker.start();
       }
     }
@@ -3067,8 +3096,8 @@
   }
 
   /**
-   * Applies a configuration change to the attributes which should be be
-   * included in the ECL.
+   * Applies a configuration change to the attributes which should be included
+   * in the ECL.
    *
    * @param includeAttributes
    *          attributes to be included with all change records.
@@ -3385,7 +3414,7 @@
    * @param msg  The byte array containing the information that should
    *             be sent to the remote entities.
    */
-  public void publish(byte[] msg)
+  void publish(byte[] msg)
   {
     UpdateMsg update;
     synchronized (this)
@@ -3489,46 +3518,16 @@
       Set<String> includeAttributes,
       Set<String> includeAttributesForDeletes)
   {
-    boolean configurationChanged = false;
-
-    synchronized (eclIncludesLock)
+    ECLIncludes current;
+    ECLIncludes updated;
+    do
     {
-      Set<String> s1 = new HashSet<String>(includeAttributes);
-
-      // Combine all+delete attributes.
-      Set<String> s2 = new HashSet<String>(s1);
-      s2.addAll(includeAttributesForDeletes);
-
-      if (!s1.equals(eclIncludesByServer.get(serverId)))
-      {
-        configurationChanged = true;
-        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
-      }
-
-      if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
-      {
-        configurationChanged = true;
-        eclIncludesForDeletesByServer.put(serverId,
-            Collections.unmodifiableSet(s2));
-      }
-
-      // and rebuild the global list to be ready for usage
-      Set<String> s = new HashSet<String>();
-      for (Set<String> attributes : eclIncludesByServer.values())
-      {
-        s.addAll(attributes);
-      }
-      eclIncludesAllServers = Collections.unmodifiableSet(s);
-
-      s = new HashSet<String>();
-      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
-      {
-        s.addAll(attributes);
-      }
-      eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s);
+      current = this.eclIncludes.get();
+      updated = current.addIncludedAttributes(
+          serverId, includeAttributes, includeAttributesForDeletes);
     }
-
-    return configurationChanged;
+    while (!this.eclIncludes.compareAndSet(current, updated));
+    return current != updated;
   }
 
 
@@ -3540,10 +3539,7 @@
    */
   public Set<String> getEclIncludes()
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesAllServers;
-    }
+    return eclIncludes.get().includedAttrsAllServers;
   }
 
 
@@ -3555,10 +3551,7 @@
    */
   public Set<String> getEclIncludesForDeletes()
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesForDeletesAllServers;
-    }
+    return eclIncludes.get().includedAttrsForDeletesAllServers;
   }
 
 
@@ -3573,10 +3566,7 @@
    */
   Set<String> getEclIncludes(int serverId)
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesByServer.get(serverId);
-    }
+    return eclIncludes.get().includedAttrsByServer.get(serverId);
   }
 
 
@@ -3591,10 +3581,7 @@
    */
   Set<String> getEclIncludesForDeletes(int serverId)
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesForDeletesByServer.get(serverId);
-    }
+    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
   }
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index 589f539..b3ca6b7 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -34,7 +34,7 @@
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.service.ReplicationDomain.IEContext;
+import org.opends.server.replication.service.ReplicationDomain.*;
 import org.opends.server.types.*;
 
 /**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index fee554b..207f48b 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -214,7 +214,7 @@
   {
     final ReplicationBroker broker = new ReplicationBroker(
         new DummyReplicationDomain(generationId), new ServerState(),
-        config, generationId, getReplSessionSecurity());
+        config, getReplSessionSecurity());
     connect(broker, port, timeout);
     return broker;
   }
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 8ba7ec4..9d56f00 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -37,6 +37,7 @@
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.opendj.ldap.ModificationType;
 import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.replication.ReplicationTestCase;
@@ -639,6 +640,7 @@
 
   private static final String REPLICATION_GENERATION_ID =
     "ds-sync-generation-id";
+  private static final Task NO_INIT_TASK = null;
 
   private long readGenIdFromSuffixRootEntry(String rootDn) throws Exception
   {
@@ -1059,7 +1061,7 @@
       replicationDomain.initExport(exportLdif, 2);
 
       // Perform full update from fake domain to fractional domain
-      replicationDomain.initializeRemote(DS1_ID);
+      replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK);
 
       /*
        * Check fractional domain is operational and that filtering has been done
@@ -1294,10 +1296,10 @@
       replicationDomain.initExport(exportLdif, 2);
 
       // Perform full update from fake domain to fractional domain
-      replicationDomain.initializeRemote(DS1_ID);
+      replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK);
 
       /*
-       * Chack fractional domain is operational and that filtering has been done
+       * Check fractional domain is operational and that filtering has been done
        * during the full update
        */
 
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 219958d..f952725 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -73,7 +73,7 @@
     public TestBroker(List<ReplicationMsg> list)
     {
       super(new DummyReplicationDomain(0), null,
-          new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), 0, null);
+          new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), null);
       this.list = list;
     }
 
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
index f44afdb..293f5da 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,7 +29,6 @@
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -228,7 +227,7 @@
     fakeCfg.setChangetimeHeartbeatInterval(500);
     ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
     ReplicationBroker broker = new ReplicationBroker(
-        new DummyReplicationDomain(generationId), state, fakeCfg, generationId, security);
+        new DummyReplicationDomain(generationId), state, fakeCfg, security);
     broker.start();
     checkConnection(30, broker, rs1Port);
     return broker;
@@ -403,20 +402,13 @@
   {
     for (int count = 0; count< 50; count++)
     {
-      List<DSInfo> dsList = ds3.getDsList();
-      DSInfo ds3Info = null;
-      if (dsList.size() > 0)
-      {
-        ds3Info = dsList.get(0);
-      }
-      if (ds3Info != null
-          && ds3Info.getDsId() == DS2_ID
-          && ds3Info.getStatus() == ServerStatus.DEGRADED_STATUS)
+      DSInfo dsInfo = ds3.getReplicaInfos().get(DS2_ID);
+      if (dsInfo != null && dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS)
       {
         break;
       }
 
-      assertTrue(count < 50, "DS2 did not get degraded : " + ds3Info);
+      assertTrue(count < 50, "DS2 did not get degraded : " + dsInfo);
       Thread.sleep(200); // Be sure status analyzer has time to test
     }
   }
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index b14f113..70a7d3d 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -1020,10 +1020,11 @@
           rd.getGroupId(), rd.getRefUrls(),
           rd.getEclIncludes(), rd.getEclIncludesForDeletes(),
           ProtocolVersion.getCurrentVersion());
-      final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList());
+      final List<DSInfo> dsList =
+          new ArrayList<DSInfo>(rd.getReplicaInfos().values());
       dsList.add(dsInfo);
 
-     TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
+     TopoView dsTopoView = new TopoView(dsList, rd.getRsInfos());
      assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId);
    }
   }
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index 9dd824d..32712bc 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -32,10 +32,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.zip.DataFormatException;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.opendj.ldap.ModificationType;
+import org.assertj.core.api.Assertions;
 import org.opends.server.core.AddOperationBasis;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ModifyDNOperationBasis;
@@ -50,6 +52,7 @@
 import org.testng.annotations.Test;
 
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.replication.protocol.OperationContext.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 import static org.opends.server.util.StaticUtils.*;
@@ -1036,61 +1039,47 @@
     assertEquals(bi.toString(16), pdu);
   }
 
-  @DataProvider(name="createTopologyData")
+  @DataProvider
   public Object [][] createTopologyData() throws Exception
   {
-    List<String> urls1 = new ArrayList<String>();
-    urls1.add("ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)");
-    urls1.add("ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber");
+    List<String> urls1 = newList(
+        "ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)",
+        "ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber");
 
-    List<String> urls2 = new ArrayList<String>();
+    List<String> urls2 = newList();
 
-    List<String> urls3 = new ArrayList<String>();
-    urls3.add("ldaps://host:port/dc=foo??sub?(sn=One Entry)");
+    List<String> urls3 = newList(
+        "ldaps://host:port/dc=foo??sub?(sn=One Entry)");
 
-    List<String> urls4 = new ArrayList<String>();
-    urls4.add("ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)");
-    urls4.add("ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
+    List<String> urls4 = newList(
+        "ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)",
+        "ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
 
-    DSInfo dsInfo1 = new DSInfo(13, "dsHost1:111", 26, 154631, ServerStatus.FULL_UPDATE_STATUS,
+    DSInfo dsInfo1 = new DSInfo(13, "", 26, 154631, ServerStatus.FULL_UPDATE_STATUS,
       false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, new HashSet<String>(), new HashSet<String>(), (short)-1);
 
-    DSInfo dsInfo2 = new DSInfo(-436, "dsHost2:222", 493, -227896, ServerStatus.DEGRADED_STATUS,
+    DSInfo dsInfo2 = new DSInfo(-436, "", 493, -227896, ServerStatus.DEGRADED_STATUS,
       true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, new HashSet<String>(), new HashSet<String>(), (short)-1);
 
-    DSInfo dsInfo3 = new DSInfo(2436, "dsHost3:333", 591, 0, ServerStatus.NORMAL_STATUS,
+    DSInfo dsInfo3 = new DSInfo(2436, "", 591, 0, ServerStatus.NORMAL_STATUS,
       false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, new HashSet<String>(), new HashSet<String>(), (short)-1);
 
-    DSInfo dsInfo4 = new DSInfo(415, "dsHost4:444", 146, 0, ServerStatus.BAD_GEN_ID_STATUS,
+    DSInfo dsInfo4 = new DSInfo(415, "", 146, 0, ServerStatus.BAD_GEN_ID_STATUS,
       true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, new HashSet<String>(), new HashSet<String>(), (short)-1);
 
-    List<DSInfo> dsList1 = new ArrayList<DSInfo>();
-    dsList1.add(dsInfo1);
-
-    List<DSInfo> dsList2 = new ArrayList<DSInfo>();
-
-    List<DSInfo> dsList3 = new ArrayList<DSInfo>();
-    dsList3.add(dsInfo2);
-
-    List<DSInfo> dsList4 = new ArrayList<DSInfo>();
-    dsList4.add(dsInfo4);
-    dsList4.add(dsInfo3);
-    dsList4.add(dsInfo2);
-    dsList4.add(dsInfo1);
+    Set<DSInfo> dsList1 = newSet(dsInfo1);
+    Set<DSInfo> dsList2 = newSet();
+    Set<DSInfo> dsList3 = newSet(dsInfo2);
+    Set<DSInfo> dsList4 = newSet(dsInfo4, dsInfo3, dsInfo2, dsInfo1);
 
     RSInfo rsInfo1 = new RSInfo(4527, null, 45316, (byte)103, 1);
     RSInfo rsInfo2 = new RSInfo(4527, null, 0, (byte)0, 1);
     RSInfo rsInfo3 = new RSInfo(0, null, -21113, (byte)98, 1);
 
-    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
-    rsList1.add(rsInfo1);
+    List<RSInfo> rsList1 = newList(rsInfo1);
+    List<RSInfo> rsList2 = newList(rsInfo1, rsInfo2, rsInfo3);
 
-    List<RSInfo> rsList2 = new ArrayList<RSInfo>();
-    rsList2.add(rsInfo1);
-    rsList2.add(rsInfo2);
-    rsList2.add(rsInfo3);
-
-    return new Object [][] {
+    return new Object[][] {
       {"1a01313300323600313534363331000300020c84026c6461703a2f2f6c6461702e697" +
        "06c616e65742e636f6d2f6f3d746573743f3f7375623f28736e3d4a656e73656e2900" +
        "6c646170733a2f2f6c6461702e69706c616e65742e636f6d3a343034312f7569643d6" +
@@ -1098,9 +1087,9 @@
        "6c6570686f6e654e756d6265720001343532370034353331360067",dsList1, rsList1},
       {"1a0003343532370034353331360067343532370030000030002d32313131330062", dsList2, rsList2},
       {"1a012d34333600343933002d32323738393600020101f9f70001343532370034353331360067", dsList3, rsList1},
-      {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, new ArrayList<RSInfo>()},
-      {"1a0001343532370034353331360067", new ArrayList<DSInfo>(), rsList1},
-      {"1a0000", new ArrayList<DSInfo>(), new ArrayList<RSInfo>()},
+      {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, newList()},
+      {"1a0001343532370034353331360067", newSet(), rsList1},
+      {"1a0000", newSet(), newList()},
       {"1a0434313500313436003000040102020f026c646170733a2f2f686f73743a706f727" +
        "42f64633d666f6f626172313f3f7375623f28736e3d416e6f7468657220456e747279" +
        "203129006c646170733a2f2f686f73743a706f72742f64633d666f6f626172323f3f7" +
@@ -1117,15 +1106,21 @@
   }
 
   @Test(dataProvider = "createTopologyData")
-  public void oldTopologyPDUs(String oldPdu, List<DSInfo> dsList, List<RSInfo> rsList)
+  public void oldTopologyPDUs(String oldPdu, Set<DSInfo> dsList, List<RSInfo> rsList)
          throws Exception
   {
     TopologyMsg msg = new TopologyMsg(hexStringToByteArray(oldPdu),
         ProtocolVersion.REPLICATION_PROTOCOL_V3);
-    assertEquals(msg.getDsList(), dsList);
-    assertEquals(msg.getRsList(), rsList);
-    BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3));
-    assertEquals(bi.toString(16), oldPdu);
+    Assertions.assertThat(new HashSet<DSInfo>(msg.getReplicaInfos().values()))
+        .isEqualTo(dsList);
+    assertEquals(msg.getRsInfos(), rsList);
+    if (msg.getReplicaInfos().values().equals(dsList))
+    {
+      // Unfortunately this check does not work when the order of the
+      // replicaInfos collection is not exactly the same as the dsList
+      BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3));
+      assertEquals(bi.toString(16), oldPdu);
+    }
   }
 
   @DataProvider(name="createEntryMsgData")
@@ -1137,9 +1132,8 @@
     int pos = 0;
     int length = 2;
     int msgid = 14;
-    Object[] set1 = new Object[] {sid, dest, entryBytes, pos, length, msgid};
 
-    return new Object [][] { set1};
+    return new Object[][] { { sid, dest, entryBytes, pos, length, msgid } };
   }
 
   /**
@@ -1186,8 +1180,7 @@
     int sender = 1;
     int dest = 2;
     LocalizableMessage message = ERR_UNKNOWN_TYPE.get("toto");
-    Object[] set1 = new Object[] {sender, dest, message};
-    return new Object [][] { set1};
+    return new Object[][] { { sender, dest, message } };
   }
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 755a1a1..c748d51 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -916,8 +916,8 @@
     TopologyMsg msg = new TopologyMsg(dsList, rsList);
     TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
         ProtocolVersion.getCurrentVersion());
-    assertEquals(msg.getDsList(), newMsg.getDsList());
-    assertEquals(msg.getRsList(), newMsg.getRsList());
+    assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos());
+    assertEquals(msg.getRsInfos(), newMsg.getRsInfos());
   }
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index eaed2d3..a9a8dc0 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -1725,14 +1725,14 @@
   private void waitForStableTopo(FakeReplicationDomain fakeRd, int expectedDs,
       int expectedRs) throws Exception
   {
-    List<DSInfo> dsInfo = null;
+    Map<Integer, DSInfo> dsInfo = null;
     List<RSInfo> rsInfo = null;
     long nSec = 0;
     long startTime = System.currentTimeMillis();
     do
     {
-      dsInfo = fakeRd.getReplicasList();
-      rsInfo = fakeRd.getRsList();
+      dsInfo = fakeRd.getReplicaInfos();
+      rsInfo = fakeRd.getRsInfos();
       if (dsInfo.size() == expectedDs && rsInfo.size() == expectedRs)
       {
         debugInfo("waitForStableTopo: expected topo obtained after " + nSec + " second(s).");
@@ -3123,8 +3123,7 @@
       // DS must see expected numbers of DSs/RSs
       final FakeReplicationDomain fakeRd1 = fakeRDs[1];
       waitForStableTopo(fakeRd1, 1, 1);
-      List<DSInfo> dsInfos = fakeRd1.getReplicasList();
-      DSInfo dsInfo = dsInfos.get(0);
+      DSInfo dsInfo = fakeRd1.getReplicaInfos().get(FDS2_ID);
       assertEquals(dsInfo.getDsId(), FDS2_ID);
       assertEquals(dsInfo.getStatus(), ServerStatus.NORMAL_STATUS);
 
@@ -3144,27 +3143,7 @@
       }
 
       // Wait for DS2 being degraded
-      boolean error = true;
-      for (int count = 0; count < 12; count++)
-      {
-        dsInfos = fakeRd1.getReplicasList();
-        if (dsInfos == null)
-          continue;
-        if (dsInfos.size() == 0)
-          continue;
-        dsInfo = dsInfos.get(0);
-        if ( (dsInfo.getDsId() == FDS2_ID) &&
-            (dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS) )
-        {
-          error = false;
-          break;
-        }
-        else
-        {
-          Thread.sleep(1000);
-        }
-      }
-      assertFalse(error, "DS2 not in degraded status");
+      expectStatusForDS(fakeRd1, ServerStatus.DEGRADED_STATUS, FDS2_ID);
 
       Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
       assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
@@ -3226,27 +3205,7 @@
       fakeRd2.startListenService();
 
       // Wait for DS2 being back to normal
-      error = true;
-      for (int count = 0; count < 12; count++)
-      {
-        dsInfos = fakeRd1.getReplicasList();
-        if (dsInfos == null)
-          continue;
-        if (dsInfos.size() == 0)
-          continue;
-        dsInfo = dsInfos.get(0);
-        if ( (dsInfo.getDsId() == FDS2_ID) &&
-            (dsInfo.getStatus() == ServerStatus.NORMAL_STATUS) )
-        {
-          error = false;
-          break;
-        }
-        else
-        {
-          Thread.sleep(1000);
-        }
-      }
-      assertFalse(error, "DS2 not back to normal status");
+      expectStatusForDS(fakeRd1, ServerStatus.NORMAL_STATUS, FDS2_ID);
 
       // DS2 should also change status so reset its assured monitoring data so no received sr updates
       assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
@@ -3317,12 +3276,29 @@
       assertEquals(fakeRd2.getReceivedUpdates(), 6);
       assertEquals(fakeRd2.getWrongReceivedUpdates(), 1);
       assertFalse(fakeRd2.receivedUpdatesOk());
-    } finally
+    }
+    finally
     {
       endTest();
     }
   }
 
+  private void expectStatusForDS(final ReplicationDomain domain,
+      ServerStatus expectedStatus, int dsId) throws InterruptedException
+  {
+    for (int count = 0; count < 12; count++)
+    {
+      final DSInfo dsInfo = domain.getReplicaInfos().get(dsId);
+      if (dsInfo != null && dsInfo.getStatus() == expectedStatus)
+      {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.fail("DS(" + dsId + ") did not have expected status "
+        + expectedStatus + " after 12 seconds");
+  }
+
   private void assertContainsOnly(Map<Integer, Integer> map, int key,
       int expectedValue)
   {
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 7d4498e..0c19525 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -359,7 +359,7 @@
       final long generationId = getGenerationId(TEST_ROOT_DN);
       broker = new ReplicationBroker(new DummyReplicationDomain(generationId),
           state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort),
-          generationId, getReplSessionSecurity());
+          getReplSessionSecurity());
       connect(broker, replicationServerPort, 5000);
 
       ReplicationMsg receivedMsg = broker.receive();
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java
new file mode 100644
index 0000000..1eace5c
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java
@@ -0,0 +1,268 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.service;
+
+import java.util.*;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.service.ReplicationBroker.ReplicationServerInfo;
+import org.opends.server.replication.service.ReplicationBroker.Topology;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static java.util.Collections.*;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
+@SuppressWarnings("javadoc")
+public class ReplicationBrokerTest extends DirectoryServerTestCase
+{
+
+  private static enum TopologyCtorToUse
+  {
+    BUILD_WITH_TOPOLOGY_MSG, BUILD_WITH_DS_RS_LISTS;
+  }
+
+  private final int CURRENT_RS_ID = 91;
+  private final int MISSING_RS_ID = 93;
+  private final int ANOTHER_RS_ID = 94;
+  private final DSInfo CURRENT_DS = dsInfo(11, CURRENT_RS_ID);
+  private final DSInfo OTHER_DS = dsInfo(12, CURRENT_RS_ID);
+  private final DSInfo MISSING_DS = dsInfo(13, CURRENT_RS_ID);
+  private final ReplicationServerInfo CURRENT_RS = rsInfo(CURRENT_RS_ID,
+      CURRENT_DS.getDsId(), OTHER_DS.getDsId());
+  private final ReplicationServerInfo MISSING_RS = rsInfo(MISSING_RS_ID,
+      MISSING_DS.getDsId());
+  private final ReplicationServerInfo ANOTHER_RS = rsInfo(ANOTHER_RS_ID);
+
+  @SuppressWarnings("unchecked")
+  private DSInfo dsInfo(int dsServerId, int rsServerId)
+  {
+    byte z = 0;
+    return new DSInfo(dsServerId, null, rsServerId, 0, null,
+        false, null, z, z, EMPTY_LIST, EMPTY_LIST, EMPTY_LIST, z);
+  }
+
+  private ReplicationServerInfo rsInfo(int rsServerId, Integer... dsIds)
+  {
+    byte z = 0;
+    final RSInfo info = new RSInfo(rsServerId, rsServerId + ":1389", 0, z, 0);
+    return new ReplicationServerInfo(info, newSet(dsIds));
+  }
+
+  private Map<Integer, ReplicationServerInfo> newMap(ReplicationServerInfo... infos)
+  {
+    if (infos.length == 0)
+    {
+      return Collections.emptyMap();
+    }
+    final Map<Integer, ReplicationServerInfo> map =
+        new HashMap<Integer, ReplicationServerInfo>();
+    for (ReplicationServerInfo info : infos)
+    {
+      map.put(info.getServerId(), info);
+    }
+    return map;
+  }
+
+  private void assertInvariants(final Topology topo)
+  {
+    assertThat(topo.replicaInfos).doesNotContainKey(CURRENT_DS.getDsId());
+  }
+
+  private ReplicationServerInfo assertContainsRSWithDSs(
+      Map<Integer, ReplicationServerInfo> rsInfos,
+      ReplicationServerInfo rsInfo, Integer... connectedDSs)
+  {
+    return assertContainsRSWithDSs(rsInfos, rsInfo, newSet(connectedDSs));
+  }
+
+  private ReplicationServerInfo assertContainsRSWithDSs(
+      Map<Integer, ReplicationServerInfo> rsInfos,
+      ReplicationServerInfo rsInfo, Set<Integer> connectedDSs)
+  {
+    final ReplicationServerInfo info = find(rsInfos, rsInfo.toRSInfo());
+    assertNotNull(info);
+    assertThat(info.getConnectedDSs()).containsAll(connectedDSs);
+    return info;
+  }
+
+  private ReplicationServerInfo find(Map<Integer, ReplicationServerInfo> rsInfos, RSInfo rsInfo)
+  {
+    for (ReplicationServerInfo info : rsInfos.values())
+    {
+      if (info.getServerId() == rsInfo.getId())
+      {
+        return info;
+      }
+    }
+    return null;
+  }
+
+  private Topology newTopology(TopologyCtorToUse toUse,
+      Map<Integer, DSInfo> replicaInfos, List<RSInfo> rsInfos, int dsServerId, int rsServerId,
+      Set<String> rsUrls, Map<Integer, ReplicationServerInfo> previousRSs)
+  {
+    if (TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG == toUse)
+    {
+      final TopologyMsg topologyMsg = new TopologyMsg(replicaInfos.values(), rsInfos);
+      return new Topology(topologyMsg, dsServerId, rsServerId, rsUrls, previousRSs);
+    }
+    else if (TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS == toUse)
+    {
+      return new Topology(replicaInfos, rsInfos, dsServerId, rsServerId, rsUrls, previousRSs);
+    }
+    Assert.fail("Do not know which Topology constructor to use: " + toUse);
+    return null;
+  }
+
+  private Map<Integer, DSInfo> newMap(DSInfo... dsInfos)
+  {
+    final Map<Integer, DSInfo> results = new HashMap<Integer, DSInfo>();
+    for (DSInfo dsInfo : dsInfos)
+    {
+      results.put(dsInfo.getDsId(), dsInfo);
+    }
+    return results;
+  }
+
+  @DataProvider
+  public Object[][] topologyCtorProvider() {
+    return new Object[][] { { TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG },
+      { TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS } };
+  }
+
+  @Test(dataProvider = "topologyCtorProvider")
+  @SuppressWarnings("unchecked")
+  public void topologyShouldContainNothing(TopologyCtorToUse toUse)
+      throws Exception
+  {
+    final Topology topo = newTopology(toUse,
+        EMPTY_MAP, EMPTY_LIST,
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).isEmpty();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void topologyShouldFilterOutCurrentDS()
+  {
+    final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG,
+        newMap(OTHER_DS, CURRENT_DS), EMPTY_LIST,
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).isEmpty();
+  }
+
+  @Test(dataProvider = "topologyCtorProvider")
+  @SuppressWarnings("unchecked")
+  public void topologyShouldContainRSWithoutOtherDS(TopologyCtorToUse toUse)
+  {
+    final Topology topo = newTopology(toUse,
+        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).hasSize(1);
+    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void topologyShouldContainRSWithAllDSs_buildWithTopologyMsg()
+  {
+    final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG,
+        newMap(CURRENT_DS, OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).hasSize(1);
+    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+  }
+
+  @Test(dataProvider = "topologyCtorProvider")
+  @SuppressWarnings("unchecked")
+  public void topologyShouldStillContainRS(TopologyCtorToUse toUse) throws Exception
+  {
+    final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS);
+    final Topology topo = newTopology(toUse,
+        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).hasSize(1);
+    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+  }
+
+  @Test(dataProvider = "topologyCtorProvider")
+  @SuppressWarnings("unchecked")
+  public void topologyShouldStillContainRSWithNewlyProvidedDSs(TopologyCtorToUse toUse)
+  {
+    final ReplicationServerInfo CURRENT_RS_WITHOUT_DS = rsInfo(CURRENT_RS_ID);
+    final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS_WITHOUT_DS);
+    final Topology topo = newTopology(toUse,
+        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).hasSize(1);
+    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId(), OTHER_DS.getDsId());
+  }
+
+  @Test(dataProvider = "topologyCtorProvider")
+  @SuppressWarnings("unchecked")
+  public void topologyShouldHaveRemovedMissingRS(TopologyCtorToUse toUse)
+  {
+    final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS, MISSING_RS);
+    final Topology topo = newTopology(toUse,
+        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).hasSize(1);
+    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void topologyShouldHaveStampedLocallyConfiguredRSs_buildWithDsRsLists()
+  {
+    final Set<String> locallyConfigured = newSet(CURRENT_RS.getServerURL());
+    final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS,
+        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo(), ANOTHER_RS.toRSInfo()),
+        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), locallyConfigured, EMPTY_MAP);
+    assertInvariants(topo);
+    assertThat(topo.rsInfos).hasSize(2);
+    ReplicationServerInfo currentRS =
+        assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+    ReplicationServerInfo anotherRS =
+        assertContainsRSWithDSs(topo.rsInfos, ANOTHER_RS);
+    assertThat(currentRS.isLocallyConfigured()).isTrue();
+    assertThat(anotherRS.isLocallyConfigured()).isFalse();
+  }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 33da154..1f612fe 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -26,12 +26,16 @@
  */
 package org.opends.server.replication.service;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.task.Task;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.DSInfo;
 import org.opends.server.replication.common.RSInfo;
@@ -42,6 +46,7 @@
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.service.ReplicationDomain.IEContext;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -54,6 +59,8 @@
 @SuppressWarnings("javadoc")
 public class ReplicationDomainTest extends ReplicationTestCase
 {
+  private static final Task NO_INIT_TASK = null;
+
   @DataProvider(name = "publishAndReceiveData")
   public Object[][] createpublishAndReceiveData()
   {
@@ -116,14 +123,14 @@
       assertNotNull(rcvdMsg);
       assertEquals(test, rcvdMsg.getPayload());
 
-      for (RSInfo replServerInfo : domain1.getRsList())
+      for (RSInfo replServerInfo : domain1.getRsInfos())
       {
         // The generation Id of the remote should be 1
         assertEquals(replServerInfo.getGenerationId(), 1,
             "Unexpected value of generationId in RSInfo for RS=" + replServerInfo);
       }
 
-      for (DSInfo serverInfo : domain1.getReplicasList())
+      for (DSInfo serverInfo : domain1.getReplicaInfos().values())
       {
         assertEquals(serverInfo.getStatus(), ServerStatus.NORMAL_STATUS);
       }
@@ -132,7 +139,7 @@
       domain1.resetReplicationLog();
       Thread.sleep(500);
 
-      for (RSInfo replServerInfo : domain1.getRsList())
+      for (RSInfo replServerInfo : domain1.getRsInfos())
       {
         // The generation Id of the remote should now be 2
         assertEquals(replServerInfo.getGenerationId(), 2,
@@ -144,9 +151,9 @@
       {
         try
         {
-          assertExpectedServerStatuses(domain1.getReplicasList(),
+          assertExpectedServerStatuses(domain1.getReplicaInfos(),
               domain1ServerId, domain2ServerId);
-          assertExpectedServerStatuses(domain2.getReplicasList(),
+          assertExpectedServerStatuses(domain2.getReplicaInfos(),
               domain1ServerId, domain2ServerId);
 
           Map<Integer, ServerState> states1 = domain1.getReplicaStates();
@@ -178,13 +185,15 @@
     }
   }
 
-  private void assertExpectedServerStatuses(List<DSInfo> dsInfos,
+  private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos,
       int domain1ServerId, int domain2ServerId)
   {
-    for (DSInfo serverInfo : dsInfos)
+    for (DSInfo serverInfo : dsInfos.values())
     {
       if (serverInfo.getDsId() == domain2ServerId)
+      {
         assertEquals(serverInfo.getStatus(), ServerStatus.BAD_GEN_ID_STATUS);
+      }
       else
       {
         assertEquals(serverInfo.getDsId(), domain1ServerId);
@@ -330,15 +339,7 @@
        * Trigger a total update from domain1 to domain2.
        * Check that the exported data is correctly received on domain2.
        */
-      for (DSInfo remoteDS : domain2.getReplicasList())
-      {
-        if (remoteDS.getDsId() != domain2.getServerId())
-        {
-          domain2.initializeFromRemote(remoteDS.getDsId());
-          break;
-        }
-      }
-
+      assertTrue(initializeFromRemote(domain2));
       waitEndExport(exportedData, importedData);
       assertExportSucessful(domain1, domain2, exportedData, importedData);
     }
@@ -349,6 +350,19 @@
     }
   }
 
+  private boolean initializeFromRemote(ReplicationDomain domain) throws DirectoryException
+  {
+    for (DSInfo remoteDS : domain.getReplicaInfos().values())
+    {
+      if (remoteDS.getDsId() != domain.getServerId())
+      {
+        domain.initializeFromRemote(remoteDS.getDsId(), NO_INIT_TASK);
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Test that a ReplicationDomain is able to export and import its database
    * across 2 replication servers.
@@ -387,7 +401,7 @@
       domain2 = new FakeReplicationDomain(
           testService, 2, servers2, 0, null, importedData, 0);
 
-      domain2.initializeFromRemote(1);
+      domain2.initializeFromRemote(1, NO_INIT_TASK);
 
       waitEndExport(exportedData, importedData);
       assertExportSucessful(domain1, domain2, exportedData, importedData);
@@ -517,23 +531,10 @@
        * Trigger a total update from domain1 to domain2.
        * Check that the exported data is correctly received on domain2.
        */
-      boolean alone = true;
-      while (alone)
+      while (!initializeFromRemote(domain1))
       {
-        for (DSInfo remoteDS : domain1.getReplicasList())
-        {
-          if (remoteDS.getDsId() != domain1.getServerId())
-          {
-            alone = false;
-            domain1.initializeFromRemote(remoteDS.getDsId() , null);
-            break;
-          }
-        }
-        if (alone)
-        {
-          System.out.println("trying...");
-          Thread.sleep(1000);
-        }
+        System.out.println("trying...");
+        Thread.sleep(1000);
       }
       System.out.println("waiting");
       Thread.sleep(10000000);

--
Gitblit v1.10.0