mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.08.2014 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a
OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR


Improved design for Replication Topology


ReplicationBroker.java + *Test.java:
Extracted the Topology class to encapsulate the dsList and replicationServerInfos fields + atomically set it via an AtomicReference + moved setLocallyConfiguredFlag(), isSameReplicationServerUrl(), computeConnectedDSs() to Topology class.
Created methods computeNewTopology() and topologyChange() to compute and set the new topology.
Removed generationID instance variable duplicated with the one from ReplicationDomain + updated ctor and setGenerationID().
Improved debugging messages.
Renamed getDsList() to getReplicaInfos() + changed return type from List<DSInfo> to Map<Integer, DSInfo>.
Renamed getRsList() to getRsInfos().
Extracted method toRSInfos().

ReplicationBrokerTest.java: ADDED
Added to test new ReplicationBroker.Topology class.


DSInfo.java:
Changed equals(Set<String>, Set<String>) to equals(Object, Object).
In toString(), hid the assured fields if assured replication is off.

RSInfo.java:
Renamed fields id and serverUrl to rsServerId and rsServerURL.
In toString(), relied on the compiler to generate the String.


TopologyMsg.java:
Renamed fields dsList and rsList to replicaInfos and rsInfos.
Renamed getDsList() to getReplicaInfos() + changed return type from List<DSInfo> to Map<Integer, DSInfo>.
Renamed getRsList() to getRsInfos().
Extracted methods readStrings() and writeStrings().
Code cleanup
Used javadocs


ReplicationDomain.java, replication*.properties:
Extracted class ECLIncludes to replace eclIncludesLock, eclIncludesByServer, eclIncludesAllServers, eclIncludesForDeletesByServer, eclIncludesForDeletesAllServers + moved setEclIncludes() implementation to this class + removed synchronized blocks from all the getters.
Added baseDN to ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.
Inlined initializeRemote().
Removed unused initializeFromRemote().
Reduced methods visibility.
Renamed getReplicaList() to getReplicaInfos() + changed return type from List<DSInfo> to Map<Integer, DSInfo>.
Renamed getRsList() to getRsInfos().
In initializeRemote(), waitForRemoteEndOfInit(), isRemoteDSConnected() and getProtocolVersion(), simplified code.

*.java:
Consequence of the changes above.
Simplified code in a number of places, particularly in the tests.
1 files added
21 files modified
1695 ■■■■■ changed files
opendj3-server-dev/src/messages/messages/replication.properties 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/messages/messages/replication_de.properties 3 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/messages/messages/replication_es.properties 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/messages/messages/replication_fr.properties 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java 9 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java 41 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java 256 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 6 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java 512 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java 335 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 8 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java 16 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java 5 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java 81 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 4 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 72 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java 268 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 65 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/messages/messages/replication.properties
@@ -347,7 +347,7 @@
 Bad msg id sequence during import. Expected:%s Actual:%s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\
 The following servers did not acknowledge initialization in the expected \
 time. They are potentially down or too slow. Servers list: %s
 time for domain %s. They are potentially down or too slow. Servers list: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\
 The following servers did not end initialization being connected with the \
 right generation (%s). They are potentially stopped or too slow. \
opendj3-server-dev/src/messages/messages/replication_de.properties
@@ -123,6 +123,7 @@
ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND_110=Die Konfiguration des Replikationsservers konnte nicht gefunden werden
DEBUG_GOING_TO_SEARCH_FOR_CHANGES_111=Der Replikationsserver ist hinsichtlich unserer \u00c4nderungen versp\u00e4tet: fehlende \u00c4nderungen werden gesendet
DEBUG_CHANGES_SENT_113=Alle fehlenden \u00c4nderungen wurden an den Replikationsserver gesendet
<<<<<<< .working
ERR_PUBLISHING_FAKE_OPS_114=Aufgefangene Ausnahme ver\u00f6ffentlicht Scheinvorg\u00e4nge f\u00fcr Dom\u00e4ne %s : %s
ERR_COMPUTING_FAKE_OPS_115=Aufgefangene Ausnahme berechnet Scheinvorg\u00e4nge f\u00fcr Dom\u00e4ne %s f\u00fcr Replikationsserver %s : %s
NOTE_SERVER_STATE_RECOVERY_117=ServerState-Wiederherstellung f\u00fcr Dom\u00e4ne %s, aktualisiert mit changeNumber %s
@@ -182,7 +183,7 @@
ERR_INIT_IMPORT_FAILURE_190=W\u00e4hrend der Initialisierung von einem Remote-Server ist der folgende Fehler aufgetreten: %s
ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Verbindungsfehler mit Replikationsserver %s w\u00e4hrend Import
ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Ung\u00fcltige Meldungs-ID-Sequenz w\u00e4hrend Import. Erwartet: %s Tats\u00e4chlich: %s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt f\u00fcr Dom\u00e4ne %s. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Die folgenden Server haben die Initialisierung nicht in Verbindung mit der entsprechenden Generation (%s) beendet. Sie wurden gestoppt oder sind zu langsam. Server-Liste: %s
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Verbindung zu Replikationsserver mit Server-ID=%s w\u00e4hrend Initialisierung von Remote-Server(n) unterbrochen
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Initialisierter Server mit Server-ID=%s bei Initialisierung von Remote-Server(n) gestoppt oder zu langsam
opendj3-server-dev/src/messages/messages/replication_es.properties
@@ -182,7 +182,7 @@
ERR_INIT_IMPORT_FAILURE_190=Durante la inicializaci\u00f3n desde un servidor remoto, se ha producido el siguiente error: %s
ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Error de conexi\u00f3n con el Servidor de repetici\u00f3n %s durante la importaci\u00f3n
ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Secuencia de Id. de mensaje incorrecto durante la importaci\u00f3n. Se requiere:%s Real:%s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado para el ND de base %s. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Los siguientes servidores no finalizaron la inicializaci\u00f3n estando conectados con la generaci\u00f3n correcta (%s). Posiblemente est\u00e1n detenidos o son demasiado lentos. Lista de servidores: %s
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Al inicializar los servidores remotos, se ha perdido la conexi\u00f3n con el Servidor de repetici\u00f3n con Id. de servidor=%s
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Al inicializar los servidores remotos, el servidor inicializado con Id. de servidor=%s posiblemente se ha detenido o es demasiado lento
opendj3-server-dev/src/messages/messages/replication_fr.properties
@@ -182,7 +182,7 @@
ERR_INIT_IMPORT_FAILURE_190=L'erreur suivante s'est produite lors de l'initialisation \u00e0 partir d'un serveur distant : %s
ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\u00c9chec de la connexion au serveur de r\u00e9plication %s lors de l'importation
ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Mauvaise s\u00e9quence d'ID de message lors de l'importation. Attendu\u00a0: %s Re\u00e7u\u00a0: %s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu pour le domaine %s. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Les serveurs suivants n'ont pas termin\u00e9 l'initialisation en cours de connexion avec la bonne g\u00e9n\u00e9ration (%s). Ils sont probablement arr\u00eat\u00e9s ou trop lents. Liste des serveurs\u00a0: %s
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=La connexion au serveur de r\u00e9plication ayant l'identifiant serverId=%s a \u00e9t\u00e9 interrompue lors de l'initalisation du/des serveur(s) distant(s)
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Le serveur initialis\u00e9 ayant l'identifiant serverId=%s \u00e9tait probablement arr\u00eat\u00e9 ou trop lent lors de l'initialisation du/des serveur(s) distant(s)
opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
@@ -283,9 +283,9 @@
        && equals(eclIncludesForDeletes, dsInfo.getEclIncludesForDeletes());
  }
  private boolean equals(Set<String> o1, Set<String> o2)
  private boolean equals(Object o1, Object o2)
  {
    return (o1 == null && o2 == null) || (o1 != null && o1.equals(o2));
    return o1 == null ? o2 == null : o1.equals(o2);
  }
  /**
@@ -320,15 +320,18 @@
  @Override
  public String toString()
  {
    StringBuilder sb = new StringBuilder();
    final StringBuilder sb = new StringBuilder();
    sb.append("DS id: ").append(dsId);
    sb.append(" ; DS url: ").append(dsUrl);
    sb.append(" ; RS id: ").append(rsId);
    sb.append(" ; Generation id: ").append(generationId);
    sb.append(" ; Status: ").append(status);
    sb.append(" ; Assured replication: ").append(assuredFlag);
    if (assuredFlag)
    {
    sb.append(" ; Assured mode: ").append(assuredMode);
    sb.append(" ; Safe data level: ").append(safeDataLevel);
    }
    sb.append(" ; Group id: ").append(groupId);
    sb.append(" ; Protocol version: ").append(protocolVersion);
    sb.append(" ; Referral URLs: ").append(refUrls);
opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2012-2013 ForgeRock AS
 *      Portions Copyright 2012-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
@@ -36,7 +36,7 @@
public final class RSInfo
{
  /** Server id of the RS. */
  private final int id;
  private final int rsServerId;
  /** Generation Id of the RS. */
  private final long generationId;
  /** Group id of the RS. */
@@ -50,22 +50,22 @@
   */
  private final int weight;
  /** The server URL of the RS. */
  private final String serverUrl;
  private final String rsServerURL;
  /**
   * Creates a new instance of RSInfo with every given info.
   *
   * @param id The RS id
   * @param serverUrl Url of the RS
   * @param rsServerId The RS id
   * @param rsServerURL Url of the RS
   * @param generationId The generation id the RS is using
   * @param groupId RS group id
   * @param weight RS weight
   */
  public RSInfo(int id, String serverUrl,
  public RSInfo(int rsServerId, String rsServerURL,
    long generationId, byte groupId, int weight)
  {
    this.id = id;
    this.serverUrl = serverUrl;
    this.rsServerId = rsServerId;
    this.rsServerURL = rsServerURL;
    this.generationId = generationId;
    this.groupId = groupId;
    this.weight = weight;
@@ -77,7 +77,7 @@
   */
  public int getId()
  {
    return id;
    return rsServerId;
  }
  /**
@@ -125,12 +125,10 @@
      return false;
    }
    final RSInfo rsInfo = (RSInfo) obj;
    return id == rsInfo.getId()
    return rsServerId == rsInfo.getId()
        && generationId == rsInfo.getGenerationId()
        && groupId == rsInfo.getGroupId()
        && weight == rsInfo.getWeight()
        && ((serverUrl == null && rsInfo.getServerUrl() == null)
            || (serverUrl != null && serverUrl.equals(rsInfo.getServerUrl())));
        && weight == rsInfo.getWeight();
  }
  /**
@@ -141,11 +139,10 @@
  public int hashCode()
  {
    int hash = 7;
    hash = 17 * hash + this.id;
    hash = 17 * hash + this.rsServerId;
    hash = 17 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
    hash = 17 * hash + this.groupId;
    hash = 17 * hash + this.weight;
    hash = 17 * hash + (this.serverUrl != null ? this.serverUrl.hashCode() : 0);
    return hash;
  }
@@ -155,7 +152,7 @@
   */
  public String getServerUrl()
  {
    return serverUrl;
    return rsServerURL;
  }
  /**
@@ -165,12 +162,10 @@
  @Override
  public String toString()
  {
    StringBuilder sb = new StringBuilder();
    sb.append("Id: ").append(id);
    sb.append(" ; Server URL: ").append(serverUrl);
    sb.append(" ; Generation id: ").append(generationId);
    sb.append(" ; Group id: ").append(groupId);
    sb.append(" ; Weight: ").append(weight);
    return sb.toString();
    return "RS id: " + rsServerId
        + " ; RS URL: " + rsServerURL
        + " ; Generation id: " + generationId
        + " ; Group id: " + groupId
        + " ; Weight: " + weight;
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -38,15 +38,14 @@
import org.opends.server.replication.common.ServerStatus;
/**
 *
 * This class defines a message that is sent:
 * - By a RS to the other RSs in the topology, containing:
 *   - the list of DSs directly connected to the RS in the DS list
 *   - only this RS in the RS list
 *   - the DSs directly connected to the RS in the DS infos
 *   - only this RS in the RS infos
 * - By a RS to his connected DSs, containing every DSs and RSs he knows.
 * In that case the message contains:
 *   - the list of every DS the RS knows except the destinator DS in the DS list
 *   - the list of every connected RSs (including the sending RS) in the RS list
 *   - every DSs the RS knows except the destinator DS in the DS infos
 *   - every connected RSs (including the sending RS) in the RS infos
 *
 * Exchanging these messages allows to have each RS or DS take
 * appropriate decisions according to the current topology:
@@ -56,10 +55,10 @@
 */
public class TopologyMsg extends ReplicationMsg
{
  // Information for the DS known in the topology
  private final List<DSInfo> dsList;
  // Information for the RS known in the topology
  private final List<RSInfo> rsList;
  /** Information for the DSs (aka replicas) known in the topology. */
  private final Map<Integer, DSInfo> replicaInfos;
  /** Information for the RSs known in the topology. */
  private final List<RSInfo> rsInfos;
  /**
   * Creates a new changelogInfo message from its encoded form.
@@ -77,18 +76,18 @@
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
          "Input is not a valid " + getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
      while ( (nDsInfo > 0) && (pos < in.length) )
      Map<Integer, DSInfo> replicaInfos =
          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
      while (nDsInfo > 0 && pos < in.length)
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
@@ -110,26 +109,21 @@
        }
        /* Read RS id */
        length =
          getNextLength(in, pos);
        serverIdString =
          new String(in, pos, length, "UTF-8");
        length = getNextLength(in, pos);
        serverIdString = new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
        /* Read DS assured flag */
        boolean assuredFlag;
        assuredFlag = in[pos++] == 1;
        boolean assuredFlag = in[pos++] == 1;
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -142,50 +136,18 @@
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        byte nUrls = in[pos++];
        byte nRead = 0;
        /* Read urls until expected number read */
        while ((nRead != nUrls) &&
          (pos < in.length) //security
          )
        {
          length = getNextLength(in, pos);
          String url = new String(in, pos, length, "UTF-8");
          refUrls.add(url);
          pos += length + 1;
          nRead++;
        }
        pos = readStrings(in, pos, refUrls);
        Set<String> attrs = new HashSet<String>();
        Set<String> delattrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byte nAttrs = in[pos++];
          nRead = 0;
          /* Read attrs until expected number read */
          while ((nRead != nAttrs) && (pos < in.length))
          {
            length = getNextLength(in, pos);
            String attr = new String(in, pos, length, "UTF-8");
            attrs.add(attr);
            pos += length + 1;
            nRead++;
          }
          pos = readStrings(in, pos, attrs);
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            nAttrs = in[pos++];
            nRead = 0;
            /* Read attrs until expected number read */
            while ((nRead != nAttrs) && (pos < in.length))
            {
              length = getNextLength(in, pos);
              String attr = new String(in, pos, length, "UTF-8");
              delattrs.add(attr);
              pos += length + 1;
              nRead++;
            }
            pos = readStrings(in, pos, delattrs);
          }
          else
          {
@@ -194,26 +156,23 @@
          }
          /* Read Protocol version */
          protocolVersion = (short)in[pos++];
          protocolVersion = in[pos++];
        }
        /* Now create DSInfo and store it in list */
        DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status,
          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
          delattrs, protocolVersion);
        dsList.add(dsInfo);
        /* Now create DSInfo and store it */
        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
            attrs, delattrs, protocolVersion));
        nDsInfo--;
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while ( (nRsInfo > 0) && (pos < in.length) )
      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while (nRsInfo > 0 && pos < in.length)
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
@@ -223,9 +182,7 @@
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
@@ -245,47 +202,67 @@
          pos += length + 1;
        }
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
          weight);
        rsList.add(rsInfo);
        /* Now create RSInfo and store it */
        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
        nRsInfo--;
      }
      this.dsList = Collections.unmodifiableList(dsList);
      this.rsList = Collections.unmodifiableList(rsList);
    } catch (UnsupportedEncodingException e)
      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
      this.rsInfos = Collections.unmodifiableList(rsInfos);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Creates a new  message from a list of the currently connected servers.
   *
   * @param dsList The list of currently connected DS servers ID.
   * @param rsList The list of currently connected RS servers ID.
   */
  public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
      throws DataFormatException, UnsupportedEncodingException
  {
    if (dsList == null || dsList.isEmpty())
    byte nAttrs = in[pos++];
    byte nRead = 0;
    // Read all elements until expected number read
    while (nRead != nAttrs && pos < in.length)
    {
      this.dsList = Collections.emptyList();
      int length = getNextLength(in, pos);
      outputCol.add(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      nRead++;
    }
    else
    {
      this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
    return pos;
    }
    if (rsList == null || rsList.isEmpty())
  /**
   * Creates a new  message of the currently connected servers.
   *
   * @param dsInfos The collection of currently connected DS servers ID.
   * @param rsInfos The list of currently connected RS servers ID.
   */
  public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
    {
      this.rsList = Collections.emptyList();
    if (dsInfos == null || dsInfos.isEmpty())
    {
      this.replicaInfos = Collections.emptyMap();
    }
    else
    {
      this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
      Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>();
      for (DSInfo dsInfo : dsInfos)
      {
        replicas.put(dsInfo.getDsId(), dsInfo);
      }
      this.replicaInfos = Collections.unmodifiableMap(replicas);
    }
    if (rsInfos == null || rsInfos.isEmpty())
    {
      this.rsInfos = Collections.emptyList();
    }
    else
    {
      this.rsInfos =
          Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
    }
  }
@@ -293,12 +270,9 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  throws UnsupportedEncodingException
  public byte[] getBytes(short version) throws UnsupportedEncodingException
  {
    try
    {
@@ -313,10 +287,10 @@
      oStream.write(MSG_TYPE_TOPOLOGY);
      // Put number of following DS info entries
      oStream.write((byte)dsList.size());
      oStream.write((byte) replicaInfos.size());
      // Put DS info
      for (DSInfo dsInfo : dsList)
      for (DSInfo dsInfo : replicaInfos.values())
      {
        // Put DS id
        byte[] byteServerId =
@@ -330,8 +304,7 @@
          oStream.write(0);
        }
        // Put RS id
        byteServerId =
          String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
@@ -349,36 +322,16 @@
        // Put DS group id
        oStream.write(dsInfo.getGroupId());
        List<String> refUrls = dsInfo.getRefUrls();
        // Put number of following URLs as a byte
        oStream.write(refUrls.size());
        for (String url : refUrls)
        {
          // Write the url and a 0 terminating byte
          oStream.write(url.getBytes("UTF-8"));
          oStream.write(0);
        }
        writeStrings(oStream, dsInfo.getRefUrls());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          Set<String> attrs = dsInfo.getEclIncludes();
          oStream.write(attrs.size());
          for (String attr : attrs)
          {
            oStream.write(attr.getBytes("UTF-8"));
            oStream.write(0);
          }
          writeStrings(oStream, dsInfo.getEclIncludes());
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            Set<String> delattrs = dsInfo.getEclIncludesForDeletes();
            oStream.write(delattrs.size());
            for (String attr : delattrs)
            {
              oStream.write(attr.getBytes("UTF-8"));
              oStream.write(0);
            }
            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
          }
          oStream.write(dsInfo.getProtocolVersion());
@@ -386,10 +339,10 @@
      }
      // Put number of following RS info entries
      oStream.write((byte)rsList.size());
      oStream.write((byte) rsInfos.size());
      // Put RS info
      for (RSInfo rsInfo : rsList)
      for (RSInfo rsInfo : rsInfos)
      {
        // Put RS id
        byte[] byteServerId =
@@ -422,54 +375,65 @@
      // never happens
      throw new RuntimeException(e);
    }
  }
  private void writeStrings(ByteArrayOutputStream oStream,
      Collection<String> col) throws IOException, UnsupportedEncodingException
  {
    // Put collection length as a byte
    oStream.write(col.size());
    for (String elem : col)
    {
      // Write the element and a 0 terminating byte
      oStream.write(elem.getBytes("UTF-8"));
      oStream.write(0);
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    String dsStr = "";
    for (DSInfo dsInfo : dsList)
    for (DSInfo dsInfo : replicaInfos.values())
    {
      dsStr += dsInfo.toString() + "\n----------------------------\n";
      dsStr += dsInfo + "\n----------------------------\n";
    }
    String rsStr = "";
    for (RSInfo rsInfo : rsList)
    for (RSInfo rsInfo : rsInfos)
    {
      rsStr += rsInfo.toString() + "\n----------------------------\n";
      rsStr += rsInfo + "\n----------------------------\n";
    }
    return ("TopologyMsg content: "
    return "TopologyMsg content:"
      + "\n----------------------------"
      + "\nCONNECTED DS SERVERS:"
      + "\n--------------------\n"
      + dsStr
      + "CONNECTED RS SERVERS:"
      + "\n--------------------\n"
      + rsStr + (rsStr.equals("") ? "----------------------------\n" : ""));
      + rsStr
      + (rsStr.equals("") ? "----------------------------\n" : "");
  }
  /**
   * Get the list of DS info.
   * @return The list of DS info
   * Get the DS infos.
   *
   * @return The DS infos
   */
  public List<DSInfo> getDsList()
  public Map<Integer, DSInfo> getReplicaInfos()
  {
    return dsList;
    return replicaInfos;
  }
  /**
   * Get the list of RS info.
   * @return The list of RS info
   * Get the RS infos.
   *
   * @return The RS infos
   */
  public List<RSInfo> getRsList()
  public List<RSInfo> getRsInfos()
  {
    return rsList;
    return rsInfos;
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -444,7 +444,7 @@
    if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // List should only contain RS info for sender
      RSInfo rsInfo = inTopoMsg.getRsList().get(0);
      RSInfo rsInfo = inTopoMsg.getRsInfos().get(0);
      weight = rsInfo.getWeight();
    }
@@ -579,7 +579,7 @@
  public void processTopoInfoFromRS(TopologyMsg topoMsg)
  {
    // List should only contain RS info for sender
    final RSInfo rsInfo = topoMsg.getRsList().get(0);
    final RSInfo rsInfo = topoMsg.getRsInfos().get(0);
    generationId = rsInfo.getGenerationId();
    groupId = rsInfo.getGroupId();
    weight = rsInfo.getWeight();
@@ -589,7 +589,7 @@
      clearRemoteLSHandlers();
      // Creates the new structure according to the message received.
      for (DSInfo dsInfo : topoMsg.getDsList())
      for (DSInfo dsInfo : topoMsg.getReplicaInfos().values())
      {
        // For each DS connected to the peer RS
        DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId);
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -173,11 +173,7 @@
  // @NotNull // for the reference
  private final AtomicReference<ConnectedRS> connectedRS =
      new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
  /**
   * Our replication domain.
   * <p>
   * Can be null for unit test purpose.
   */
  /** Our replication domain. */
  private ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
@@ -217,25 +213,14 @@
  /*
   * Properties for the last topology info received from the network.
   */
  /**
   * Info for other DSs.
   * <p>
   * Warning: does not contain info for us (for our server id)
   */
  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
  private volatile long generationID;
  /** Contains the last known state of the replication topology. */
  private final AtomicReference<Topology> topology =
      new AtomicReference<Topology>(new Topology());
  /** <pre>@GuardedBy("this")</pre>. */
  private volatile int updateDoneCount = 0;
  private volatile boolean connectRequiresRecovery = false;
  /**
   * The map of replication server info initialized at connection time and
   * regularly updated. This is used to decide to which best suitable
   * replication server one wants to connect. Key: replication server id Value:
   * replication server info for the matching replication server id
   */
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
  /**
   * This integer defines when the best replication server checking algorithm
   * should be engaged.
   * Every time a monitoring message (each monitoring publisher period) is
@@ -266,19 +251,16 @@
   * @param state The ServerState that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param config The configuration to use.
   * @param generationId The generationId for the server associated to the
   * provided serverId and for the domain associated to the provided baseDN.
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
      ServerState state, ReplicationDomainCfg config, long generationId,
      ServerState state, ReplicationDomainCfg config,
      ReplSessionSecurity replSessionSecurity)
  {
    this.domain = replicationDomain;
    this.state = state;
    this.config = config;
    this.replSessionSecurity = replSessionSecurity;
    this.generationID = generationId;
    this.rcvWindow = getMaxRcvWindow();
    this.halfRcvWindow = rcvWindow / 2;
@@ -352,8 +334,7 @@
   */
  private long getGenerationID()
  {
    generationID = domain.getGenerationID();
    return generationID;
    return domain.getGenerationID();
  }
  /**
@@ -362,38 +343,7 @@
   */
  public void setGenerationID(long generationID)
  {
    this.generationID = generationID;
  }
  /**
   * Sets the locally configured flag for the passed ReplicationServerInfo
   * object, analyzing the local configuration.
   * @param rsInfo the Replication server to check and update
   */
  private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
  {
    // Determine if the passed ReplicationServerInfo has a URL that is present
    // in the locally configured replication servers
    String rsUrl = rsInfo.getServerURL();
    if (rsUrl == null)
    {
      // The ReplicationServerInfo has been generated from a server with
      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
      // ignore this server as we do not know how to connect to it
      rsInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : getReplicationServerUrls())
    {
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
        // This RS is locally configured, mark this
        rsInfo.setLocallyConfigured(true);
        rsInfo.setServerURL(serverUrl);
        return;
      }
    }
    rsInfo.setLocallyConfigured(false);
    domain.setGenerationID(generationID);
  }
  /**
@@ -733,8 +683,10 @@
    @Override
    public String toString()
    {
      return "Url:" + getServerURL() + " ServerId:" + getServerId()
          + " GroupId:" + getGroupId();
      return "ReplServerInfo Url:" + getServerURL()
          + " ServerId:" + getServerId()
          + " GroupId:" + getGroupId()
          + " connectedDSs:" + connectedDSs;
    }
  }
@@ -860,9 +812,11 @@
            + "elect the preferred one");
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      Map<Integer, ReplicationServerInfo> rsInfos =
          collectReplicationServersInfo();
      computeNewTopology(toRSInfos(rsInfos));
      if (replicationServerInfos.isEmpty())
      if (rsInfos.isEmpty())
      {
        setConnectedRS(ConnectedRS.noConnectedRS());
      }
@@ -870,7 +824,7 @@
      {
        // At least one server answered, find the best one.
        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
            replicationServerInfos, serverId, getGroupId(), getGenerationID());
            rsInfos, serverId, getGroupId(), getGenerationID());
        // Best found, now initialize connection to this one (handshake phase 1)
        if (logger.isTraceEnabled())
@@ -886,8 +840,7 @@
          Update replication server info with potentially more up to date
          data (server state for instance may have changed)
          */
          replicationServerInfos
              .put(electedRsInfo.getServerId(), electedRsInfo);
          rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
          // Handshake phase 1 exchange went well
@@ -935,10 +888,10 @@
          connectionError = true;
          connectPhaseLock.notify();
          if (replicationServerInfos.size() > 0)
          if (rsInfos.size() > 0)
          {
            logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN.toNormalizedString(),
                Utils.joinAsString(", ", replicationServerInfos.keySet()));
                Utils.joinAsString(", ", rsInfos.keySet()));
          }
          else
          {
@@ -949,6 +902,43 @@
    }
  }
  private void computeNewTopology(List<RSInfo> newRSInfos)
  {
    final int rsServerId = getRsServerId();
    Topology oldTopo;
    Topology newTopo;
    do
    {
      oldTopo = topology.get();
      newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
          rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
    }
    while (!topology.compareAndSet(oldTopo, newTopo));
    if (logger.isTraceEnabled())
    {
      debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
    }
  }
  private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
      Topology newTopo)
  {
    final StringBuilder sb = new StringBuilder();
    sb.append("rsServerId=").append(rsServerId);
    if (newTopo.equals(oldTopo))
    {
      sb.append(", unchangedTopology=").append(newTopo);
    }
    else
    {
      sb.append(", oldTopology=").append(oldTopo);
      sb.append(", newTopology=").append(newTopo);
    }
    return sb;
  }
  /**
   * Connects to a replication server.
   *
@@ -2303,7 +2293,7 @@
    if (logger.isTraceEnabled())
    {
      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
          + rs.getServerId() + ") genId=" + generationID);
          + rs.getServerId() + ") genId=" + getGenerationID());
    }
  }
@@ -2408,7 +2398,8 @@
          */
          credit =
            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
        } else
        }
        else
        {
          credit = true;
        }
@@ -2451,6 +2442,11 @@
      }
      catch (IOException e)
      {
        if (logger.isTraceEnabled())
        {
          debugInfo("publish(): IOException caught: "
              + stackTraceToSingleLineString(e));
        }
        if (!retryOnFailure)
        {
          return false;
@@ -2463,23 +2459,24 @@
          try
          {
            connectPhaseLock.wait(100);
          } catch (InterruptedException e1)
          }
          catch (InterruptedException ignored)
          {
            // ignore
            if (logger.isTraceEnabled())
            {
              debugInfo("publish(): Interrupted exception raised : "
                  + e.getLocalizedMessage());
              debugInfo("publish(): InterruptedException caught 1: "
                  + stackTraceToSingleLineString(ignored));
            }
          }
        }
      } catch (InterruptedException e)
      }
      catch (InterruptedException ignored)
      {
        // just loop.
        if (logger.isTraceEnabled())
        {
          debugInfo("publish(): Interrupted exception raised."
              + e.getLocalizedMessage());
          debugInfo("publish(): InterruptedException caught 2: "
              + stackTraceToSingleLineString(ignored));
        }
      }
    }
@@ -2607,9 +2604,10 @@
          }
          // Update the replication servers ServerStates with new received info
          Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
          for (int srvId : toIterable(monitorMsg.rsIterator()))
          {
            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
            final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
            if (rsInfo != null)
            {
              rsInfo.update(monitorMsg.getRSServerState(srvId));
@@ -2629,9 +2627,9 @@
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              final RSEvaluations evals =
                  computeBestReplicationServer(false, previousRsServerID, state,
                  replicationServerInfos, serverId, getGroupId(), generationID);
              final RSEvaluations evals = computeBestReplicationServer(
                  false, previousRsServerID, state,
                  rsInfos, serverId, getGroupId(), getGenerationID());
              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
              if (previousRsServerID != -1
                  && (bestServerInfo == null
@@ -2951,9 +2949,9 @@
   * Gets the info for DSs in the topology (except us).
   * @return The info for DSs in the topology (except us)
   */
  public List<DSInfo> getDsList()
  public Map<Integer, DSInfo> getReplicaInfos()
  {
    return dsList;
    return topology.get().replicaInfos;
  }
  /**
@@ -2962,10 +2960,15 @@
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  public List<RSInfo> getRsInfos()
  {
    return toRSInfos(topology.get().rsInfos);
  }
  private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
  {
    final List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
    for (ReplicationServerInfo rsInfo : rsInfos.values())
    {
      result.add(rsInfo.toRSInfo());
    }
@@ -2973,39 +2976,6 @@
  }
  /**
   * Computes the list of DSs connected to a particular RS.
   * @param rsId The RS id of the server one wants to know the connected DSs
   * @param dsList The list of DSinfo from which to compute things
   * @param rsServerId the serverId to use for the connectedDS
   * @return The list of connected DSs to the server rsId
   */
  private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
      int rsServerId)
  {
    final Set<Integer> connectedDSs = new HashSet<Integer>();
    if (rsServerId == rsId)
    {
      /*
      If we are computing connected DSs for the RS we are connected
      to, we should count the local DS as the DSInfo of the local DS is not
      sent by the replication server in the topology message. We must count
      ourselves as a connected server.
      */
      connectedDSs.add(getServerId());
    }
    for (DSInfo dsInfo : dsList)
    {
      if (dsInfo.getRsId() == rsId)
      {
        connectedDSs.add(dsInfo.getDsId());
      }
    }
    return connectedDSs;
  }
  /**
   * Processes an incoming TopologyMsg.
   * Updates the structures for the local view of the topology.
   *
@@ -3016,42 +2986,298 @@
   */
  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
  {
    if (logger.isTraceEnabled())
      debugInfo("receive TopologyMsg=" + topoMsg);
    // Store new DS list
    dsList = topoMsg.getDsList();
    // Update replication server info list with the received topology
    // information
    final Set<Integer> rssToKeep = new HashSet<Integer>();
    for (RSInfo rsInfo : topoMsg.getRsList())
    final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
    for (DSInfo dsInfo : newTopo.replicaInfos.values())
    {
      final int rsId = rsInfo.getId();
      domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
          .getEclIncludesForDeletes());
    }
  }
  private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
  {
    Topology oldTopo;
    Topology newTopo;
    do
    {
      oldTopo = topology.get();
      newTopo = new Topology(topoMsg, getServerId(), rsServerId,
              getReplicationServerUrls(), oldTopo.rsInfos);
    }
    while (!topology.compareAndSet(oldTopo, newTopo));
    if (logger.isTraceEnabled())
    {
      final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
      sb.append(" received TopologyMsg=").append(topoMsg);
      debugInfo(sb);
    }
    return newTopo;
  }
  /**
   * Contains the last known state of the replication topology.
   */
  static final class Topology
  {
    /**
     * The RS's serverId that this DS was connected to when this topology state
     * was computed.
     */
    private final int rsServerId;
    /**
     * Info for other DSs.
     * <p>
     * Warning: does not contain info for us (for our server id)
     */
    final Map<Integer, DSInfo> replicaInfos;
    /**
     * The map of replication server info initialized at connection time and
     * regularly updated. This is used to decide to which best suitable
     * replication server one wants to connect. Key: replication server id
     * Value: replication server info for the matching replication server id
     */
    final Map<Integer, ReplicationServerInfo> rsInfos;
    private Topology()
    {
      this.rsServerId = -1;
      this.replicaInfos = Collections.emptyMap();
      this.rsInfos = Collections.emptyMap();
    }
    /**
     * Constructor to use when only the RSInfos need to be recomputed.
     *
     * @param dsInfosToKeep
     *          the DSInfos that will be stored as is
     * @param newRSInfos
     *          the new RSInfos from which to compute the new topology
     * @param dsServerId
     *          the DS serverId
     * @param rsServerId
     *          the current connected RS serverId
     * @param configuredReplicationServerUrls
     *          the configured replication server URLs
     * @param previousRsInfos
     *          the RSInfos computed in the previous Topology object
     */
    Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
        int dsServerId, int rsServerId,
        Set<String> configuredReplicationServerUrls,
        Map<Integer, ReplicationServerInfo> previousRsInfos)
    {
      this.rsServerId = rsServerId;
      this.replicaInfos = dsInfosToKeep;
      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
          previousRsInfos, configuredReplicationServerUrls);
    }
    /**
     * Constructor to use when a new TopologyMsg has been received.
     *
     * @param topoMsg
     *          the topology message containing the new DSInfos and RSInfos from
     *          which to compute the new topology
     * @param dsServerId
     *          the DS serverId
     * @param rsServerId
     *          the current connected RS serverId
     * @param configuredReplicationServerUrls
     *          the configured replication server URLs
     * @param previousRsInfos
     *          the RSInfos computed in the previous Topology object
     */
    Topology(TopologyMsg topoMsg, int dsServerId,
        int rsServerId, Set<String> configuredReplicationServerUrls,
        Map<Integer, ReplicationServerInfo> previousRsInfos)
    {
      this.rsServerId = rsServerId;
      this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
      this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
          previousRsInfos, configuredReplicationServerUrls);
    }
    private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
        int dsServerId)
    {
      final Map<Integer, DSInfo> copy = new HashMap<Integer, DSInfo>(dsInfos);
      copy.remove(dsServerId);
      return Collections.unmodifiableMap(copy);
    }
    private Map<Integer, ReplicationServerInfo> computeRSInfos(
        int dsServerId, List<RSInfo> newRsInfos,
        Map<Integer, ReplicationServerInfo> previousRsInfos,
        Set<String> configuredReplicationServerUrls)
    {
      final Map<Integer, ReplicationServerInfo> results =
          new HashMap<Integer, ReplicationServerInfo>(previousRsInfos);
      // Update replication server info list with the received topology info
      final Set<Integer> rssToKeep = new HashSet<Integer>();
      for (RSInfo newRSInfo : newRsInfos)
      {
        final int rsId = newRSInfo.getId();
      rssToKeep.add(rsId); // Mark this server as still existing
      Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
      ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
      if (rsInfo2 == null)
        Set<Integer> connectedDSs =
            computeDSsConnectedTo(rsId, dsServerId);
        ReplicationServerInfo rsInfo = results.get(rsId);
        if (rsInfo == null)
      {
        // New replication server, create info for it add it to the list
        rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
        setLocallyConfiguredFlag(rsInfo2);
        replicationServerInfos.put(rsId, rsInfo2);
          rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
          setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
          results.put(rsId, rsInfo);
      }
      else
      {
        // Update the existing info for the replication server
        rsInfo2.update(rsInfo, connectedDSs);
          rsInfo.update(newRSInfo, connectedDSs);
      }
    }
    // Remove any replication server that may have disappeared from the topology
    replicationServerInfos.keySet().retainAll(rssToKeep);
      // Remove any replication server that may have disappeared from the
      // topology
      results.keySet().retainAll(rssToKeep);
    for (DSInfo info : dsList)
      return Collections.unmodifiableMap(results);
    }
    /** Computes the list of DSs connected to a particular RS. */
    private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
    {
      domain.setEclIncludes(info.getDsId(), info.getEclIncludes(),
          info.getEclIncludesForDeletes());
      final Set<Integer> connectedDSs = new HashSet<Integer>();
      if (rsServerId == rsId)
      {
        /*
         * If we are computing connected DSs for the RS we are connected to, we
         * should count the local DS as the DSInfo of the local DS is not sent
         * by the replication server in the topology message. We must count
         * ourselves as a connected server.
         */
        connectedDSs.add(dsServerId);
      }
      for (DSInfo dsInfo : replicaInfos.values())
      {
        if (dsInfo.getRsId() == rsId)
        {
          connectedDSs.add(dsInfo.getDsId());
        }
      }
      return connectedDSs;
    }
    /**
     * Sets the locally configured flag for the passed ReplicationServerInfo
     * object, analyzing the local configuration.
     *
     * @param rsInfo
     *          the Replication server to check and update
     * @param configuredReplicationServerUrls
     */
    private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
        Set<String> configuredReplicationServerUrls)
    {
      // Determine if the passed ReplicationServerInfo has a URL that is present
      // in the locally configured replication servers
      String rsUrl = rsInfo.getServerURL();
      if (rsUrl == null)
      {
        // The ReplicationServerInfo has been generated from a server with
        // no URL in TopologyMsg (i.e: with replication protocol version < 4):
        // ignore this server as we do not know how to connect to it
        rsInfo.setLocallyConfigured(false);
        return;
      }
      for (String serverUrl : configuredReplicationServerUrls)
      {
        if (isSameReplicationServerUrl(serverUrl, rsUrl))
        {
          // This RS is locally configured, mark this
          rsInfo.setLocallyConfigured(true);
          rsInfo.setServerURL(serverUrl);
          return;
        }
      }
      rsInfo.setLocallyConfigured(false);
    }
    /** {@inheritDoc} */
    @Override
    public boolean equals(Object obj)
    {
      if (this == obj)
      {
        return true;
      }
      if (obj == null || getClass() != obj.getClass())
      {
        return false;
      }
      final Topology other = (Topology) obj;
      return rsServerId == other.rsServerId
          && equals(replicaInfos, other.replicaInfos)
          && equals(rsInfos, other.rsInfos)
          && urlsEqual1(replicaInfos, other.replicaInfos)
          && urlsEqual2(rsInfos, other.rsInfos);
    }
    private boolean equals(Object o1, Object o2)
    {
      return o1 == null ? o2 == null : o1.equals(o2);
    }
    private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
        Map<Integer, DSInfo> replicaInfos2)
    {
      for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
      {
        DSInfo dsInfo = replicaInfos2.get(entry.getKey());
        if (!equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
        {
          return false;
        }
      }
      return true;
    }
    private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
        Map<Integer, ReplicationServerInfo> rsInfos2)
    {
      for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
      {
        ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
        if (!equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
        {
          return false;
        }
      }
      return true;
    }
    /** {@inheritDoc} */
    @Override
    public int hashCode()
    {
      final int prime = 31;
      int result = 1;
      result = prime * result + rsServerId;
      result = prime * result
          + (replicaInfos == null ? 0 : replicaInfos.hashCode());
      result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
      return result;
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
          + ", rsInfos=" + rsInfos.values();
    }
  }
@@ -3197,15 +3423,15 @@
      .append(" \"").append(getBaseDN()).append(" ")
      .append(getServerId()).append("\",")
      .append(" groupId=").append(getGroupId())
      .append(", genId=").append(generationID)
      .append(", genId=").append(getGenerationID())
      .append(", ");
    connectedRS.get().toString(sb);
    return sb.toString();
  }
  private void debugInfo(String message)
  private void debugInfo(CharSequence message)
  {
    logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
        + " and serverId=" + getServerId() + " " + message);
        + " and serverId=" + getServerId() + ": " + message);
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -95,15 +95,118 @@
 * <p>
 *   Full Initialization of a replica can be triggered by LDAP clients
 *   by creating InitializeTasks or InitializeTargetTask.
 *   Full initialization can also by triggered from the ReplicationDomain
 *   implementation using methods {@link #initializeRemote(int)}
 *   or {@link #initializeFromRemote(int)}.
 *   Full initialization can also be triggered from the ReplicationDomain
 *   implementation using methods {@link #initializeRemote(int, Task)}
 *   or {@link #initializeFromRemote(int, Task)}.
 * <p>
 *   At shutdown time, the {@link #disableService()} method should be called to
 *   cleanly stop the replication service.
 */
public abstract class ReplicationDomain
{
  /**
   * Contains all the attributes included for the ECL (External Changelog).
   */
  // @Immutable
  private final static class ECLIncludes
  {
    final Map<Integer, Set<String>> includedAttrsByServer;
    final Set<String> includedAttrsAllServers;
    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
    final Set<String> includedAttrsForDeletesAllServers;
    private ECLIncludes(
        Map<Integer, Set<String>> includedAttrsByServer,
        Set<String> includedAttrsAllServers,
        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
        Set<String> includedAttrsForDeletesAllServers)
    {
      this.includedAttrsByServer = includedAttrsByServer;
      this.includedAttrsAllServers = includedAttrsAllServers;
      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
    }
    @SuppressWarnings("unchecked")
    public ECLIncludes()
    {
      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
          Collections.EMPTY_SET);
    }
    /**
     * Add attributes to be included in the ECL.
     *
     * @param serverId
     *          Server where these attributes are configured.
     * @param includeAttributes
     *          Attributes to be included with all change records, may include
     *          wild-cards.
     * @param includeAttributesForDeletes
     *          Additional attributes to be included with delete change records,
     *          may include wild-cards.
     * @return a new {@link ECLIncludes} object if included attributes have
     *         changed, or the current object otherwise.
     */
    public ECLIncludes addIncludedAttributes(int serverId,
        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
    {
      boolean configurationChanged = false;
      Set<String> s1 = new HashSet<String>(includeAttributes);
      // Combine all+delete attributes.
      Set<String> s2 = new HashSet<String>(s1);
      s2.addAll(includeAttributesForDeletes);
      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer = new HashMap<Integer, Set<String>>(
            this.includedAttrsByServer);
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      }
      Map<Integer, Set<String>> eclIncludesForDeletesByServer =
          this.includedAttrsForDeletesByServer;
      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
                this.includedAttrsForDeletesByServer);
        eclIncludesForDeletesByServer.put(
            serverId, Collections.unmodifiableSet(s2));
      }
      if (!configurationChanged)
      {
        return this;
      }
      // and rebuild the global list to be ready for usage
      Set<String> eclIncludesAllServer = new HashSet<String>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        eclIncludesAllServer.addAll(attributes);
      }
      Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
      {
        eclIncludesForDeletesAllServer.addAll(attributes);
      }
      return new ECLIncludes(eclIncludesByServer,
          Collections.unmodifiableSet(eclIncludesAllServer),
          eclIncludesForDeletesByServer,
          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
    }
  }
  /**
   * Current status for this replicated domain.
   */
@@ -251,14 +354,8 @@
   */
  private final CSNGenerator generator;
  private final Object eclIncludesLock = new Object();
  private final Map<Integer, Set<String>> eclIncludesByServer =
    new HashMap<Integer, Set<String>>();
  private Set<String> eclIncludesAllServers = Collections.emptySet();
  private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
    new HashMap<Integer, Set<String>>();
  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
  private final AtomicReference<ECLIncludes> eclIncludes =
      new AtomicReference<ECLIncludes>(new ECLIncludes());
  /**
   * An object used to protect the initialization of the underlying broker
@@ -551,9 +648,9 @@
   * Gets the info for Replicas in the topology (except us).
   * @return The info for Replicas in the topology (except us)
   */
  public List<DSInfo> getReplicasList()
  public Map<Integer, DSInfo> getReplicaInfos()
  {
    return broker.getDsList();
    return broker.getReplicaInfos();
  }
  /**
@@ -562,20 +659,13 @@
   * disconnected. Return null when no server with the provided serverId is
   * connected.
   *
   * @param  serverId The provided serverId of the remote replica
   * @param  dsId The provided serverId of the remote replica
   * @return the info related to this remote server if it is connected,
   *                  null is the server is NOT connected.
   */
  public DSInfo isRemoteDSConnected(int serverId)
  private DSInfo isRemoteDSConnected(int dsId)
  {
    for (DSInfo remoteDS : getReplicasList())
    {
      if (remoteDS.getDsId() == serverId)
      {
        return remoteDS;
      }
    }
    return null;
    return getReplicaInfos().get(dsId);
  }
  /**
@@ -601,9 +691,9 @@
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  public List<RSInfo> getRsInfos()
  {
    return broker.getRsList();
    return broker.getRsInfos();
  }
@@ -1100,7 +1190,7 @@
     *                         for and import, false if the IEContext
     *                         will be used for and export.
     */
    public IEContext(boolean importInProgress)
    private IEContext(boolean importInProgress)
    {
      this.importInProgress = importInProgress;
      this.startTime = System.currentTimeMillis();
@@ -1114,7 +1204,7 @@
     * @return A boolean indicating if a total update import is currently in
     *         Progress.
     */
    public boolean importInProgress()
    boolean importInProgress()
    {
      return importInProgress;
    }
@@ -1153,18 +1243,17 @@
      entryCount = total;
      entryLeftCount = total;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        final InitializeTask task = (InitializeTask) initializeTask;
        task.setTotal(entryCount);
        task.setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
        task.setTotal(entryCount);
        task.setLeft(entryCount);
      }
    }
@@ -1177,7 +1266,7 @@
     *
     * @throws DirectoryException if an error occurred.
     */
    public void updateCounters(int entriesDone) throws DirectoryException
    private void updateCounters(int entriesDone) throws DirectoryException
    {
      entryLeftCount -= entriesDone;
@@ -1258,7 +1347,7 @@
     * @param serverId serverId of the acknowledger/receiver/importer server.
     * @param numAck   id of the message received.
     */
    public void setAckVal(int serverId, int numAck)
    private void setAckVal(int serverId, int numAck)
    {
      if (logger.isTraceEnabled())
        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
@@ -1315,6 +1404,7 @@
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
        // JNR: Yes please
      }
      return target;
    }
@@ -1338,7 +1428,7 @@
   *
   * @param target   The server-id of the server that should be initialized.
   *                 The target can be discovered using the
   *                 {@link #getReplicasList()} method.
   *                 {@link #getReplicaInfos()} method.
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   *
@@ -1386,13 +1476,10 @@
      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
          countEntries(), getBaseDNString(), getServerId());
      for (DSInfo dsi : getReplicasList())
      {
        ieCtx.startList.add(dsi.getDsId());
      }
      ieCtx.startList.addAll(getReplicaInfos().keySet());
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
@@ -1408,7 +1495,7 @@
      ieCtx.startList.add(serverToInitialize);
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
@@ -1453,7 +1540,7 @@
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(),
                  ieCtx.failureList));
        }
@@ -1472,7 +1559,7 @@
      if (logger.isTraceEnabled())
        logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
            + " export ends with " + " connected=" + broker.isConnected()
            + " export ends with connected=" + broker.isConnected()
            + " exportRootException=" + exportRootException);
      if (exportRootException != null)
@@ -1592,7 +1679,7 @@
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (logger.isTraceEnabled())
          logger.trace(
@@ -1650,10 +1737,7 @@
    considered in the processing of sorting the successfully initialized
    and the others
    */
    for (DSInfo dsi : getReplicasList())
    {
      replicasWeAreWaitingFor.add(dsi.getDsId());
    }
    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
    boolean done;
    do
@@ -1698,8 +1782,7 @@
            done = false;
            break;
          }
          else
          {
            if (dsInfo.getGenerationId() == getGenerationID())
            { // and with the expected generationId
              // We're done with this server
@@ -1707,7 +1790,6 @@
            }
          }
        }
      }
      // loop and wait
      if (!done)
@@ -1717,7 +1799,6 @@
          Thread.currentThread().interrupt();
        } // 1sec
      }
    }
    while (!done && !broker.shuttingDown()); // infinite wait
@@ -1967,7 +2048,7 @@
   *
   * @throws IOException when an error occurred.
   */
  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
  throws IOException
  {
    if (logger.isTraceEnabled())
@@ -2072,53 +2153,6 @@
  }
  /**
   * Initializes this domain from another source server.
   * <p>
   * When this method is called, a request for initialization will
   * be sent to the source server asking for initialization.
   * <p>
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on the source server, and the {@code importBackend(InputStream)}
   * will be called on his server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param source   The server-id of the source from which to initialize.
   *                 The source can be discovered using the
   *                 {@link #getReplicasList()} method.
   *
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeFromRemote(int source) throws DirectoryException
  {
    initializeFromRemote(source, null);
  }
  /**
   * Initializes a remote server from this server.
   * <p>
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on this server, and the {@code importBackend(InputStream)}
   * will be called on the remote server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param target   The server-id of the server that should be initialized.
   *                 The target can be discovered using the
   *                 {@link #getReplicasList()} method.
   *
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeRemote(int target) throws DirectoryException
  {
    initializeRemote(target, null);
  }
  /**
   * Initializes asynchronously this domain from a remote source server.
   * Before returning from this call, for the provided task :
   * - the progressing counters are updated during the initialization using
@@ -2131,7 +2165,7 @@
   *
   * @param source   The server-id of the source from which to initialize.
   *                 The source can be discovered using the
   *                 {@link #getReplicasList()} method.
   *                 {@link #getReplicaInfos()} method.
   *
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
@@ -2217,7 +2251,7 @@
   *                          task has initially been created (this server,
   *                          or the remote server).
   */
  void initialize(InitializeTargetMsg initTargetMsgReceived,
  private void initialize(InitializeTargetMsg initTargetMsgReceived,
      int requesterServerId)
  {
    InitializeTask initFromTask = null;
@@ -2254,7 +2288,6 @@
      ieCtx.importSource = source;
      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask) ieCtx.initializeTask;
@@ -2382,18 +2415,14 @@
   * @param dsServerId The provided serverId.
   * @return The protocol version.
   */
  short getProtocolVersion(int dsServerId)
  private short getProtocolVersion(int dsServerId)
  {
    short protocolVersion = -1;
    for (DSInfo dsi : getReplicasList())
    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
    if (dsInfo != null)
    {
      if (dsi.getDsId() == dsServerId)
      {
        protocolVersion = dsi.getProtocolVersion();
        break;
      return dsInfo.getProtocolVersion();
      }
    }
    return protocolVersion;
    return -1;
  }
  /**
@@ -2459,7 +2488,7 @@
    for (int i = 0; i< 50; i++)
    {
      allSet = true;
      for (RSInfo rsInfo : getRsList())
      for (RSInfo rsInfo : getRsInfos())
      {
        // the 'empty' RSes (generationId==-1) are considered as good citizens
        if (rsInfo.getGenerationId() != -1 &&
@@ -2498,7 +2527,7 @@
   *                           connected to a Replication Server or it
   *                           was not possible to contact it.
   */
  public void resetReplicationLog() throws DirectoryException
  void resetReplicationLog() throws DirectoryException
  {
    // Reset the Generation ID to -1 to clean the ReplicationServers.
    resetGenerationId(-1L);
@@ -2913,7 +2942,7 @@
      {
        // create the broker object used to publish and receive changes
        broker = new ReplicationBroker(
            this, state, config, getGenerationID(), new ReplSessionSecurity());
            this, state, config, new ReplSessionSecurity());
        broker.start();
      }
    }
@@ -3067,8 +3096,8 @@
  }
  /**
   * Applies a configuration change to the attributes which should be be
   * included in the ECL.
   * Applies a configuration change to the attributes which should be included
   * in the ECL.
   *
   * @param includeAttributes
   *          attributes to be included with all change records.
@@ -3385,7 +3414,7 @@
   * @param msg  The byte array containing the information that should
   *             be sent to the remote entities.
   */
  public void publish(byte[] msg)
  void publish(byte[] msg)
  {
    UpdateMsg update;
    synchronized (this)
@@ -3489,46 +3518,16 @@
      Set<String> includeAttributes,
      Set<String> includeAttributesForDeletes)
  {
    boolean configurationChanged = false;
    synchronized (eclIncludesLock)
    ECLIncludes current;
    ECLIncludes updated;
    do
    {
      Set<String> s1 = new HashSet<String>(includeAttributes);
      // Combine all+delete attributes.
      Set<String> s2 = new HashSet<String>(s1);
      s2.addAll(includeAttributesForDeletes);
      if (!s1.equals(eclIncludesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      current = this.eclIncludes.get();
      updated = current.addIncludedAttributes(
          serverId, includeAttributes, includeAttributesForDeletes);
      }
      if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer.put(serverId,
            Collections.unmodifiableSet(s2));
      }
      // and rebuild the global list to be ready for usage
      Set<String> s = new HashSet<String>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        s.addAll(attributes);
      }
      eclIncludesAllServers = Collections.unmodifiableSet(s);
      s = new HashSet<String>();
      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
      {
        s.addAll(attributes);
      }
      eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s);
    }
    return configurationChanged;
    while (!this.eclIncludes.compareAndSet(current, updated));
    return current != updated;
  }
@@ -3540,10 +3539,7 @@
   */
  public Set<String> getEclIncludes()
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesAllServers;
    }
    return eclIncludes.get().includedAttrsAllServers;
  }
@@ -3555,10 +3551,7 @@
   */
  public Set<String> getEclIncludesForDeletes()
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesForDeletesAllServers;
    }
    return eclIncludes.get().includedAttrsForDeletesAllServers;
  }
@@ -3573,10 +3566,7 @@
   */
  Set<String> getEclIncludes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesByServer.get(serverId);
    }
    return eclIncludes.get().includedAttrsByServer.get(serverId);
  }
@@ -3591,10 +3581,7 @@
   */
  Set<String> getEclIncludesForDeletes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesForDeletesByServer.get(serverId);
    }
    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -34,7 +34,7 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.ReplicationDomain.IEContext;
import org.opends.server.replication.service.ReplicationDomain.*;
import org.opends.server.types.*;
/**
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -214,7 +214,7 @@
  {
    final ReplicationBroker broker = new ReplicationBroker(
        new DummyReplicationDomain(generationId), new ServerState(),
        config, generationId, getReplSessionSecurity());
        config, getReplSessionSecurity());
    connect(broker, port, timeout);
    return broker;
  }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -37,6 +37,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ModificationType;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.ReplicationTestCase;
@@ -639,6 +640,7 @@
  private static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  private static final Task NO_INIT_TASK = null;
  private long readGenIdFromSuffixRootEntry(String rootDn) throws Exception
  {
@@ -1059,7 +1061,7 @@
      replicationDomain.initExport(exportLdif, 2);
      // Perform full update from fake domain to fractional domain
      replicationDomain.initializeRemote(DS1_ID);
      replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK);
      /*
       * Check fractional domain is operational and that filtering has been done
@@ -1294,10 +1296,10 @@
      replicationDomain.initExport(exportLdif, 2);
      // Perform full update from fake domain to fractional domain
      replicationDomain.initializeRemote(DS1_ID);
      replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK);
      /*
       * Chack fractional domain is operational and that filtering has been done
       * Check fractional domain is operational and that filtering has been done
       * during the full update
       */
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -73,7 +73,7 @@
    public TestBroker(List<ReplicationMsg> list)
    {
      super(new DummyReplicationDomain(0), null,
          new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), 0, null);
          new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), null);
      this.list = list;
    }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -228,7 +227,7 @@
    fakeCfg.setChangetimeHeartbeatInterval(500);
    ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
    ReplicationBroker broker = new ReplicationBroker(
        new DummyReplicationDomain(generationId), state, fakeCfg, generationId, security);
        new DummyReplicationDomain(generationId), state, fakeCfg, security);
    broker.start();
    checkConnection(30, broker, rs1Port);
    return broker;
@@ -403,20 +402,13 @@
  {
    for (int count = 0; count< 50; count++)
    {
      List<DSInfo> dsList = ds3.getDsList();
      DSInfo ds3Info = null;
      if (dsList.size() > 0)
      {
        ds3Info = dsList.get(0);
      }
      if (ds3Info != null
          && ds3Info.getDsId() == DS2_ID
          && ds3Info.getStatus() == ServerStatus.DEGRADED_STATUS)
      DSInfo dsInfo = ds3.getReplicaInfos().get(DS2_ID);
      if (dsInfo != null && dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS)
      {
        break;
      }
      assertTrue(count < 50, "DS2 did not get degraded : " + ds3Info);
      assertTrue(count < 50, "DS2 did not get degraded : " + dsInfo);
      Thread.sleep(200); // Be sure status analyzer has time to test
    }
  }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -1020,10 +1020,11 @@
          rd.getGroupId(), rd.getRefUrls(),
          rd.getEclIncludes(), rd.getEclIncludesForDeletes(),
          ProtocolVersion.getCurrentVersion());
      final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList());
      final List<DSInfo> dsList =
          new ArrayList<DSInfo>(rd.getReplicaInfos().values());
      dsList.add(dsInfo);
     TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
     TopoView dsTopoView = new TopoView(dsList, rd.getRsInfos());
     assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId);
   }
  }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -32,10 +32,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.zip.DataFormatException;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.ModificationType;
import org.assertj.core.api.Assertions;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -50,6 +52,7 @@
import org.testng.annotations.Test;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -1036,59 +1039,45 @@
    assertEquals(bi.toString(16), pdu);
  }
  @DataProvider(name="createTopologyData")
  @DataProvider
  public Object [][] createTopologyData() throws Exception
  {
    List<String> urls1 = new ArrayList<String>();
    urls1.add("ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)");
    urls1.add("ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber");
    List<String> urls1 = newList(
        "ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)",
        "ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber");
    List<String> urls2 = new ArrayList<String>();
    List<String> urls2 = newList();
    List<String> urls3 = new ArrayList<String>();
    urls3.add("ldaps://host:port/dc=foo??sub?(sn=One Entry)");
    List<String> urls3 = newList(
        "ldaps://host:port/dc=foo??sub?(sn=One Entry)");
    List<String> urls4 = new ArrayList<String>();
    urls4.add("ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)");
    urls4.add("ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
    List<String> urls4 = newList(
        "ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)",
        "ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
    DSInfo dsInfo1 = new DSInfo(13, "dsHost1:111", 26, 154631, ServerStatus.FULL_UPDATE_STATUS,
    DSInfo dsInfo1 = new DSInfo(13, "", 26, 154631, ServerStatus.FULL_UPDATE_STATUS,
      false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, new HashSet<String>(), new HashSet<String>(), (short)-1);
    DSInfo dsInfo2 = new DSInfo(-436, "dsHost2:222", 493, -227896, ServerStatus.DEGRADED_STATUS,
    DSInfo dsInfo2 = new DSInfo(-436, "", 493, -227896, ServerStatus.DEGRADED_STATUS,
      true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, new HashSet<String>(), new HashSet<String>(), (short)-1);
    DSInfo dsInfo3 = new DSInfo(2436, "dsHost3:333", 591, 0, ServerStatus.NORMAL_STATUS,
    DSInfo dsInfo3 = new DSInfo(2436, "", 591, 0, ServerStatus.NORMAL_STATUS,
      false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, new HashSet<String>(), new HashSet<String>(), (short)-1);
    DSInfo dsInfo4 = new DSInfo(415, "dsHost4:444", 146, 0, ServerStatus.BAD_GEN_ID_STATUS,
    DSInfo dsInfo4 = new DSInfo(415, "", 146, 0, ServerStatus.BAD_GEN_ID_STATUS,
      true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, new HashSet<String>(), new HashSet<String>(), (short)-1);
    List<DSInfo> dsList1 = new ArrayList<DSInfo>();
    dsList1.add(dsInfo1);
    List<DSInfo> dsList2 = new ArrayList<DSInfo>();
    List<DSInfo> dsList3 = new ArrayList<DSInfo>();
    dsList3.add(dsInfo2);
    List<DSInfo> dsList4 = new ArrayList<DSInfo>();
    dsList4.add(dsInfo4);
    dsList4.add(dsInfo3);
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    Set<DSInfo> dsList1 = newSet(dsInfo1);
    Set<DSInfo> dsList2 = newSet();
    Set<DSInfo> dsList3 = newSet(dsInfo2);
    Set<DSInfo> dsList4 = newSet(dsInfo4, dsInfo3, dsInfo2, dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, null, 45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, null, 0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, null, -21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
    List<RSInfo> rsList2 = new ArrayList<RSInfo>();
    rsList2.add(rsInfo1);
    rsList2.add(rsInfo2);
    rsList2.add(rsInfo3);
    List<RSInfo> rsList1 = newList(rsInfo1);
    List<RSInfo> rsList2 = newList(rsInfo1, rsInfo2, rsInfo3);
    return new Object [][] {
      {"1a01313300323600313534363331000300020c84026c6461703a2f2f6c6461702e697" +
@@ -1098,9 +1087,9 @@
       "6c6570686f6e654e756d6265720001343532370034353331360067",dsList1, rsList1},
      {"1a0003343532370034353331360067343532370030000030002d32313131330062", dsList2, rsList2},
      {"1a012d34333600343933002d32323738393600020101f9f70001343532370034353331360067", dsList3, rsList1},
      {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, new ArrayList<RSInfo>()},
      {"1a0001343532370034353331360067", new ArrayList<DSInfo>(), rsList1},
      {"1a0000", new ArrayList<DSInfo>(), new ArrayList<RSInfo>()},
      {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, newList()},
      {"1a0001343532370034353331360067", newSet(), rsList1},
      {"1a0000", newSet(), newList()},
      {"1a0434313500313436003000040102020f026c646170733a2f2f686f73743a706f727" +
       "42f64633d666f6f626172313f3f7375623f28736e3d416e6f7468657220456e747279" +
       "203129006c646170733a2f2f686f73743a706f72742f64633d666f6f626172323f3f7" +
@@ -1117,16 +1106,22 @@
  }
  @Test(dataProvider = "createTopologyData")
  public void oldTopologyPDUs(String oldPdu, List<DSInfo> dsList, List<RSInfo> rsList)
  public void oldTopologyPDUs(String oldPdu, Set<DSInfo> dsList, List<RSInfo> rsList)
         throws Exception
  {
    TopologyMsg msg = new TopologyMsg(hexStringToByteArray(oldPdu),
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    assertEquals(msg.getDsList(), dsList);
    assertEquals(msg.getRsList(), rsList);
    Assertions.assertThat(new HashSet<DSInfo>(msg.getReplicaInfos().values()))
        .isEqualTo(dsList);
    assertEquals(msg.getRsInfos(), rsList);
    if (msg.getReplicaInfos().values().equals(dsList))
    {
      // Unfortunately this check does not work when the order of the
      // replicaInfos collection is not exactly the same as the dsList
    BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3));
    assertEquals(bi.toString(16), oldPdu);
  }
  }
  @DataProvider(name="createEntryMsgData")
  public Object [][] createEntryMsgData() throws Exception
@@ -1137,9 +1132,8 @@
    int pos = 0;
    int length = 2;
    int msgid = 14;
    Object[] set1 = new Object[] {sid, dest, entryBytes, pos, length, msgid};
    return new Object [][] { set1};
    return new Object[][] { { sid, dest, entryBytes, pos, length, msgid } };
  }
  /**
@@ -1186,8 +1180,7 @@
    int sender = 1;
    int dest = 2;
    LocalizableMessage message = ERR_UNKNOWN_TYPE.get("toto");
    Object[] set1 = new Object[] {sender, dest, message};
    return new Object [][] { set1};
    return new Object[][] { { sender, dest, message } };
  }
  /**
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -916,8 +916,8 @@
    TopologyMsg msg = new TopologyMsg(dsList, rsList);
    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
        ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getDsList(), newMsg.getDsList());
    assertEquals(msg.getRsList(), newMsg.getRsList());
    assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos());
    assertEquals(msg.getRsInfos(), newMsg.getRsInfos());
  }
  /**
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -1725,14 +1725,14 @@
  private void waitForStableTopo(FakeReplicationDomain fakeRd, int expectedDs,
      int expectedRs) throws Exception
  {
    List<DSInfo> dsInfo = null;
    Map<Integer, DSInfo> dsInfo = null;
    List<RSInfo> rsInfo = null;
    long nSec = 0;
    long startTime = System.currentTimeMillis();
    do
    {
      dsInfo = fakeRd.getReplicasList();
      rsInfo = fakeRd.getRsList();
      dsInfo = fakeRd.getReplicaInfos();
      rsInfo = fakeRd.getRsInfos();
      if (dsInfo.size() == expectedDs && rsInfo.size() == expectedRs)
      {
        debugInfo("waitForStableTopo: expected topo obtained after " + nSec + " second(s).");
@@ -3123,8 +3123,7 @@
      // DS must see expected numbers of DSs/RSs
      final FakeReplicationDomain fakeRd1 = fakeRDs[1];
      waitForStableTopo(fakeRd1, 1, 1);
      List<DSInfo> dsInfos = fakeRd1.getReplicasList();
      DSInfo dsInfo = dsInfos.get(0);
      DSInfo dsInfo = fakeRd1.getReplicaInfos().get(FDS2_ID);
      assertEquals(dsInfo.getDsId(), FDS2_ID);
      assertEquals(dsInfo.getStatus(), ServerStatus.NORMAL_STATUS);
@@ -3144,27 +3143,7 @@
      }
      // Wait for DS2 being degraded
      boolean error = true;
      for (int count = 0; count < 12; count++)
      {
        dsInfos = fakeRd1.getReplicasList();
        if (dsInfos == null)
          continue;
        if (dsInfos.size() == 0)
          continue;
        dsInfo = dsInfos.get(0);
        if ( (dsInfo.getDsId() == FDS2_ID) &&
            (dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS) )
        {
          error = false;
          break;
        }
        else
        {
          Thread.sleep(1000);
        }
      }
      assertFalse(error, "DS2 not in degraded status");
      expectStatusForDS(fakeRd1, ServerStatus.DEGRADED_STATUS, FDS2_ID);
      Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
@@ -3226,27 +3205,7 @@
      fakeRd2.startListenService();
      // Wait for DS2 being back to normal
      error = true;
      for (int count = 0; count < 12; count++)
      {
        dsInfos = fakeRd1.getReplicasList();
        if (dsInfos == null)
          continue;
        if (dsInfos.size() == 0)
          continue;
        dsInfo = dsInfos.get(0);
        if ( (dsInfo.getDsId() == FDS2_ID) &&
            (dsInfo.getStatus() == ServerStatus.NORMAL_STATUS) )
        {
          error = false;
          break;
        }
        else
        {
          Thread.sleep(1000);
        }
      }
      assertFalse(error, "DS2 not back to normal status");
      expectStatusForDS(fakeRd1, ServerStatus.NORMAL_STATUS, FDS2_ID);
      // DS2 should also change status so reset its assured monitoring data so no received sr updates
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
@@ -3317,12 +3276,29 @@
      assertEquals(fakeRd2.getReceivedUpdates(), 6);
      assertEquals(fakeRd2.getWrongReceivedUpdates(), 1);
      assertFalse(fakeRd2.receivedUpdatesOk());
    } finally
    }
    finally
    {
      endTest();
    }
  }
  private void expectStatusForDS(final ReplicationDomain domain,
      ServerStatus expectedStatus, int dsId) throws InterruptedException
  {
    for (int count = 0; count < 12; count++)
    {
      final DSInfo dsInfo = domain.getReplicaInfos().get(dsId);
      if (dsInfo != null && dsInfo.getStatus() == expectedStatus)
      {
        return;
      }
      Thread.sleep(1000);
    }
    Assert.fail("DS(" + dsId + ") did not have expected status "
        + expectedStatus + " after 12 seconds");
  }
  private void assertContainsOnly(Map<Integer, Integer> map, int key,
      int expectedValue)
  {
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -359,7 +359,7 @@
      final long generationId = getGenerationId(TEST_ROOT_DN);
      broker = new ReplicationBroker(new DummyReplicationDomain(generationId),
          state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort),
          generationId, getReplSessionSecurity());
          getReplSessionSecurity());
      connect(broker, replicationServerPort, 5000);
      ReplicationMsg receivedMsg = broker.receive();
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java
New file
@@ -0,0 +1,268 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.service;
import java.util.*;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.service.ReplicationBroker.ReplicationServerInfo;
import org.opends.server.replication.service.ReplicationBroker.Topology;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.util.Collections.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.testng.Assert.*;
@SuppressWarnings("javadoc")
public class ReplicationBrokerTest extends DirectoryServerTestCase
{
  private static enum TopologyCtorToUse
  {
    BUILD_WITH_TOPOLOGY_MSG, BUILD_WITH_DS_RS_LISTS;
  }
  private final int CURRENT_RS_ID = 91;
  private final int MISSING_RS_ID = 93;
  private final int ANOTHER_RS_ID = 94;
  private final DSInfo CURRENT_DS = dsInfo(11, CURRENT_RS_ID);
  private final DSInfo OTHER_DS = dsInfo(12, CURRENT_RS_ID);
  private final DSInfo MISSING_DS = dsInfo(13, CURRENT_RS_ID);
  private final ReplicationServerInfo CURRENT_RS = rsInfo(CURRENT_RS_ID,
      CURRENT_DS.getDsId(), OTHER_DS.getDsId());
  private final ReplicationServerInfo MISSING_RS = rsInfo(MISSING_RS_ID,
      MISSING_DS.getDsId());
  private final ReplicationServerInfo ANOTHER_RS = rsInfo(ANOTHER_RS_ID);
  @SuppressWarnings("unchecked")
  private DSInfo dsInfo(int dsServerId, int rsServerId)
  {
    byte z = 0;
    return new DSInfo(dsServerId, null, rsServerId, 0, null,
        false, null, z, z, EMPTY_LIST, EMPTY_LIST, EMPTY_LIST, z);
  }
  private ReplicationServerInfo rsInfo(int rsServerId, Integer... dsIds)
  {
    byte z = 0;
    final RSInfo info = new RSInfo(rsServerId, rsServerId + ":1389", 0, z, 0);
    return new ReplicationServerInfo(info, newSet(dsIds));
  }
  private Map<Integer, ReplicationServerInfo> newMap(ReplicationServerInfo... infos)
  {
    if (infos.length == 0)
    {
      return Collections.emptyMap();
    }
    final Map<Integer, ReplicationServerInfo> map =
        new HashMap<Integer, ReplicationServerInfo>();
    for (ReplicationServerInfo info : infos)
    {
      map.put(info.getServerId(), info);
    }
    return map;
  }
  private void assertInvariants(final Topology topo)
  {
    assertThat(topo.replicaInfos).doesNotContainKey(CURRENT_DS.getDsId());
  }
  private ReplicationServerInfo assertContainsRSWithDSs(
      Map<Integer, ReplicationServerInfo> rsInfos,
      ReplicationServerInfo rsInfo, Integer... connectedDSs)
  {
    return assertContainsRSWithDSs(rsInfos, rsInfo, newSet(connectedDSs));
  }
  private ReplicationServerInfo assertContainsRSWithDSs(
      Map<Integer, ReplicationServerInfo> rsInfos,
      ReplicationServerInfo rsInfo, Set<Integer> connectedDSs)
  {
    final ReplicationServerInfo info = find(rsInfos, rsInfo.toRSInfo());
    assertNotNull(info);
    assertThat(info.getConnectedDSs()).containsAll(connectedDSs);
    return info;
  }
  private ReplicationServerInfo find(Map<Integer, ReplicationServerInfo> rsInfos, RSInfo rsInfo)
  {
    for (ReplicationServerInfo info : rsInfos.values())
    {
      if (info.getServerId() == rsInfo.getId())
      {
        return info;
      }
    }
    return null;
  }
  private Topology newTopology(TopologyCtorToUse toUse,
      Map<Integer, DSInfo> replicaInfos, List<RSInfo> rsInfos, int dsServerId, int rsServerId,
      Set<String> rsUrls, Map<Integer, ReplicationServerInfo> previousRSs)
  {
    if (TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG == toUse)
    {
      final TopologyMsg topologyMsg = new TopologyMsg(replicaInfos.values(), rsInfos);
      return new Topology(topologyMsg, dsServerId, rsServerId, rsUrls, previousRSs);
    }
    else if (TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS == toUse)
    {
      return new Topology(replicaInfos, rsInfos, dsServerId, rsServerId, rsUrls, previousRSs);
    }
    Assert.fail("Do not know which Topology constructor to use: " + toUse);
    return null;
  }
  private Map<Integer, DSInfo> newMap(DSInfo... dsInfos)
  {
    final Map<Integer, DSInfo> results = new HashMap<Integer, DSInfo>();
    for (DSInfo dsInfo : dsInfos)
    {
      results.put(dsInfo.getDsId(), dsInfo);
    }
    return results;
  }
  @DataProvider
  public Object[][] topologyCtorProvider() {
    return new Object[][] { { TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG },
      { TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS } };
  }
  @Test(dataProvider = "topologyCtorProvider")
  @SuppressWarnings("unchecked")
  public void topologyShouldContainNothing(TopologyCtorToUse toUse)
      throws Exception
  {
    final Topology topo = newTopology(toUse,
        EMPTY_MAP, EMPTY_LIST,
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
    assertInvariants(topo);
    assertThat(topo.rsInfos).isEmpty();
  }
  @Test
  @SuppressWarnings("unchecked")
  public void topologyShouldFilterOutCurrentDS()
  {
    final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG,
        newMap(OTHER_DS, CURRENT_DS), EMPTY_LIST,
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
    assertInvariants(topo);
    assertThat(topo.rsInfos).isEmpty();
  }
  @Test(dataProvider = "topologyCtorProvider")
  @SuppressWarnings("unchecked")
  public void topologyShouldContainRSWithoutOtherDS(TopologyCtorToUse toUse)
  {
    final Topology topo = newTopology(toUse,
        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
    assertInvariants(topo);
    assertThat(topo.rsInfos).hasSize(1);
    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId());
  }
  @Test
  @SuppressWarnings("unchecked")
  public void topologyShouldContainRSWithAllDSs_buildWithTopologyMsg()
  {
    final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG,
        newMap(CURRENT_DS, OTHER_DS), newList(CURRENT_RS.toRSInfo()),
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
    assertInvariants(topo);
    assertThat(topo.rsInfos).hasSize(1);
    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
  }
  @Test(dataProvider = "topologyCtorProvider")
  @SuppressWarnings("unchecked")
  public void topologyShouldStillContainRS(TopologyCtorToUse toUse) throws Exception
  {
    final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS);
    final Topology topo = newTopology(toUse,
        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
    assertInvariants(topo);
    assertThat(topo.rsInfos).hasSize(1);
    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
  }
  @Test(dataProvider = "topologyCtorProvider")
  @SuppressWarnings("unchecked")
  public void topologyShouldStillContainRSWithNewlyProvidedDSs(TopologyCtorToUse toUse)
  {
    final ReplicationServerInfo CURRENT_RS_WITHOUT_DS = rsInfo(CURRENT_RS_ID);
    final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS_WITHOUT_DS);
    final Topology topo = newTopology(toUse,
        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
    assertInvariants(topo);
    assertThat(topo.rsInfos).hasSize(1);
    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId(), OTHER_DS.getDsId());
  }
  @Test(dataProvider = "topologyCtorProvider")
  @SuppressWarnings("unchecked")
  public void topologyShouldHaveRemovedMissingRS(TopologyCtorToUse toUse)
  {
    final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS, MISSING_RS);
    final Topology topo = newTopology(toUse,
        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
    assertInvariants(topo);
    assertThat(topo.rsInfos).hasSize(1);
    assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
  }
  @Test
  @SuppressWarnings("unchecked")
  public void topologyShouldHaveStampedLocallyConfiguredRSs_buildWithDsRsLists()
  {
    final Set<String> locallyConfigured = newSet(CURRENT_RS.getServerURL());
    final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS,
        newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo(), ANOTHER_RS.toRSInfo()),
        CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), locallyConfigured, EMPTY_MAP);
    assertInvariants(topo);
    assertThat(topo.rsInfos).hasSize(2);
    ReplicationServerInfo currentRS =
        assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
    ReplicationServerInfo anotherRS =
        assertContainsRSWithDSs(topo.rsInfos, ANOTHER_RS);
    assertThat(currentRS.isLocallyConfigured()).isTrue();
    assertThat(anotherRS.isLocallyConfigured()).isFalse();
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -26,12 +26,16 @@
 */
package org.opends.server.replication.service;
import java.util.*;
import java.util.Arrays;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.Task;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
@@ -42,6 +46,7 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain.IEContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -54,6 +59,8 @@
@SuppressWarnings("javadoc")
public class ReplicationDomainTest extends ReplicationTestCase
{
  private static final Task NO_INIT_TASK = null;
  @DataProvider(name = "publishAndReceiveData")
  public Object[][] createpublishAndReceiveData()
  {
@@ -116,14 +123,14 @@
      assertNotNull(rcvdMsg);
      assertEquals(test, rcvdMsg.getPayload());
      for (RSInfo replServerInfo : domain1.getRsList())
      for (RSInfo replServerInfo : domain1.getRsInfos())
      {
        // The generation Id of the remote should be 1
        assertEquals(replServerInfo.getGenerationId(), 1,
            "Unexpected value of generationId in RSInfo for RS=" + replServerInfo);
      }
      for (DSInfo serverInfo : domain1.getReplicasList())
      for (DSInfo serverInfo : domain1.getReplicaInfos().values())
      {
        assertEquals(serverInfo.getStatus(), ServerStatus.NORMAL_STATUS);
      }
@@ -132,7 +139,7 @@
      domain1.resetReplicationLog();
      Thread.sleep(500);
      for (RSInfo replServerInfo : domain1.getRsList())
      for (RSInfo replServerInfo : domain1.getRsInfos())
      {
        // The generation Id of the remote should now be 2
        assertEquals(replServerInfo.getGenerationId(), 2,
@@ -144,9 +151,9 @@
      {
        try
        {
          assertExpectedServerStatuses(domain1.getReplicasList(),
          assertExpectedServerStatuses(domain1.getReplicaInfos(),
              domain1ServerId, domain2ServerId);
          assertExpectedServerStatuses(domain2.getReplicasList(),
          assertExpectedServerStatuses(domain2.getReplicaInfos(),
              domain1ServerId, domain2ServerId);
          Map<Integer, ServerState> states1 = domain1.getReplicaStates();
@@ -178,13 +185,15 @@
    }
  }
  private void assertExpectedServerStatuses(List<DSInfo> dsInfos,
  private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos,
      int domain1ServerId, int domain2ServerId)
  {
    for (DSInfo serverInfo : dsInfos)
    for (DSInfo serverInfo : dsInfos.values())
    {
      if (serverInfo.getDsId() == domain2ServerId)
      {
        assertEquals(serverInfo.getStatus(), ServerStatus.BAD_GEN_ID_STATUS);
      }
      else
      {
        assertEquals(serverInfo.getDsId(), domain1ServerId);
@@ -330,15 +339,7 @@
       * Trigger a total update from domain1 to domain2.
       * Check that the exported data is correctly received on domain2.
       */
      for (DSInfo remoteDS : domain2.getReplicasList())
      {
        if (remoteDS.getDsId() != domain2.getServerId())
        {
          domain2.initializeFromRemote(remoteDS.getDsId());
          break;
        }
      }
      assertTrue(initializeFromRemote(domain2));
      waitEndExport(exportedData, importedData);
      assertExportSucessful(domain1, domain2, exportedData, importedData);
    }
@@ -349,6 +350,19 @@
    }
  }
  private boolean initializeFromRemote(ReplicationDomain domain) throws DirectoryException
  {
    for (DSInfo remoteDS : domain.getReplicaInfos().values())
    {
      if (remoteDS.getDsId() != domain.getServerId())
      {
        domain.initializeFromRemote(remoteDS.getDsId(), NO_INIT_TASK);
        return true;
      }
    }
    return false;
  }
  /**
   * Test that a ReplicationDomain is able to export and import its database
   * across 2 replication servers.
@@ -387,7 +401,7 @@
      domain2 = new FakeReplicationDomain(
          testService, 2, servers2, 0, null, importedData, 0);
      domain2.initializeFromRemote(1);
      domain2.initializeFromRemote(1, NO_INIT_TASK);
      waitEndExport(exportedData, importedData);
      assertExportSucessful(domain1, domain2, exportedData, importedData);
@@ -517,24 +531,11 @@
       * Trigger a total update from domain1 to domain2.
       * Check that the exported data is correctly received on domain2.
       */
      boolean alone = true;
      while (alone)
      {
        for (DSInfo remoteDS : domain1.getReplicasList())
        {
          if (remoteDS.getDsId() != domain1.getServerId())
          {
            alone = false;
            domain1.initializeFromRemote(remoteDS.getDsId() , null);
            break;
          }
        }
        if (alone)
      while (!initializeFromRemote(domain1))
        {
          System.out.println("trying...");
          Thread.sleep(1000);
        }
      }
      System.out.println("waiting");
      Thread.sleep(10000000);
    }