OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR
Improved design for Replication Topology
ReplicationBroker.java + *Test.java:
Extracted the Topology class to encapsulate the dsList and replicationServerInfos fields + atomically set it via an AtomicReference + moved setLocallyConfiguredFlag(), isSameReplicationServerUrl(), computeConnectedDSs() to Topology class.
Created methods computeNewTopology() and topologyChange() to compute and set the new topology.
Removed generationID instance variable duplicated with the one from ReplicationDomain + updated ctor and setGenerationID().
Improved debugging messages.
Renamed getDsList() to getReplicaInfos() + changed return type from List<DSInfo> to Map<Integer, DSInfo>.
Renamed getRsList() to getRsInfos().
Extracted method toRSInfos().
ReplicationBrokerTest.java: ADDED
Added to test new ReplicationBroker.Topology class.
DSInfo.java:
Changed equals(Set<String>, Set<String>) to equals(Object, Object).
In toString(), hid the assured fields if assured replication is off.
RSInfo.java:
Renamed fields id and serverUrl to rsServerId and rsServerURL.
In toString(), relied on the compiler to generate the String.
TopologyMsg.java:
Renamed fields dsList and rsList to replicaInfos and rsInfos.
Renamed getDsList() to getReplicaInfos() + changed return type from List<DSInfo> to Map<Integer, DSInfo>.
Renamed getRsList() to getRsInfos().
Extracted methods readStrings() and writeStrings().
Code cleanup
Used javadocs
ReplicationDomain.java, replication*.properties:
Extracted class ECLIncludes to replace eclIncludesLock, eclIncludesByServer, eclIncludesAllServers, eclIncludesForDeletesByServer, eclIncludesForDeletesAllServers + moved setEclIncludes() implementation to this class + removed synchronized blocks from all the getters.
Added baseDN to ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.
Inlined initializeRemote().
Removed unused initializeFromRemote().
Reduced methods visibility.
Renamed getReplicaList() to getReplicaInfos() + changed return type from List<DSInfo> to Map<Integer, DSInfo>.
Renamed getRsList() to getRsInfos().
In initializeRemote(), waitForRemoteEndOfInit(), isRemoteDSConnected() and getProtocolVersion(), simplified code.
*.java:
Consequence of the changes above.
Simplified code in a number of places, particularly in the tests.
1 files added
23 files modified
| | |
| | | Bad msg id sequence during import. Expected:%s Actual:%s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\ |
| | | The following servers did not acknowledge initialization in the expected \ |
| | | time. They are potentially down or too slow. Servers list: %s |
| | | time for domain %s. They are potentially down or too slow. Servers list: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\ |
| | | The following servers did not end initialization being connected with the \ |
| | | right generation (%s). They are potentially stopped or too slow. \ |
| | |
| | | SEVERE_ERR_INIT_IMPORT_FAILURE_190=W\u00e4hrend der Initialisierung von einem Remote-Server ist der folgende Fehler aufgetreten: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Verbindungsfehler mit Replikationsserver %s w\u00e4hrend Import |
| | | SEVERE_ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Ung\u00fcltige Meldungs-ID-Sequenz w\u00e4hrend Import. Erwartet: %s Tats\u00e4chlich: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt f\u00fcr Dom\u00e4ne %s. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Die folgenden Server haben die Initialisierung nicht in Verbindung mit der entsprechenden Generation (%s) beendet. Sie wurden gestoppt oder sind zu langsam. Server-Liste: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Verbindung zu Replikationsserver mit Server-ID=%s w\u00e4hrend Initialisierung von Remote-Server(n) unterbrochen |
| | | SEVERE_ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Initialisierter Server mit Server-ID=%s bei Initialisierung von Remote-Server(n) gestoppt oder zu langsam |
| | |
| | | SEVERE_ERR_INIT_IMPORT_FAILURE_190=Durante la inicializaci\u00f3n desde un servidor remoto, se ha producido el siguiente error: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Error de conexi\u00f3n con el Servidor de repetici\u00f3n %s durante la importaci\u00f3n |
| | | SEVERE_ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Secuencia de Id. de mensaje incorrecto durante la importaci\u00f3n. Se requiere:%s Real:%s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado para el ND de base %s. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Los siguientes servidores no finalizaron la inicializaci\u00f3n estando conectados con la generaci\u00f3n correcta (%s). Posiblemente est\u00e1n detenidos o son demasiado lentos. Lista de servidores: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Al inicializar los servidores remotos, se ha perdido la conexi\u00f3n con el Servidor de repetici\u00f3n con Id. de servidor=%s |
| | | SEVERE_ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Al inicializar los servidores remotos, el servidor inicializado con Id. de servidor=%s posiblemente se ha detenido o es demasiado lento |
| | |
| | | SEVERE_ERR_INIT_IMPORT_FAILURE_190=L'erreur suivante s'est produite lors de l'initialisation \u00e0 partir d'un serveur distant : %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\u00c9chec de la connexion au serveur de r\u00e9plication %s lors de l'importation |
| | | SEVERE_ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Mauvaise s\u00e9quence d'ID de message lors de l'importation. Attendu\u00a0: %s Re\u00e7u\u00a0: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu pour le domaine %s. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Les serveurs suivants n'ont pas termin\u00e9 l'initialisation en cours de connexion avec la bonne g\u00e9n\u00e9ration (%s). Ils sont probablement arr\u00eat\u00e9s ou trop lents. Liste des serveurs\u00a0: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=La connexion au serveur de r\u00e9plication ayant l'identifiant serverId=%s a \u00e9t\u00e9 interrompue lors de l'initalisation du/des serveur(s) distant(s) |
| | | SEVERE_ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Le serveur initialis\u00e9 ayant l'identifiant serverId=%s \u00e9tait probablement arr\u00eat\u00e9 ou trop lent lors de l'initialisation du/des serveur(s) distant(s) |
| | |
| | | SEVERE_ERR_INIT_IMPORT_FAILURE_190=\u30ea\u30e2\u30fc\u30c8\u30b5\u30fc\u30d0\u30fc\u304b\u3089\u306e\u521d\u671f\u5316\u4e2d\u306b\u6b21\u306e\u30a8\u30e9\u30fc\u304c\u767a\u751f\u3057\u307e\u3057\u305f: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\u30a4\u30f3\u30dd\u30fc\u30c8\u4e2d\u306b\u30ec\u30d7\u30ea\u30b1\u30fc\u30b7\u30e7\u30f3\u30b5\u30fc\u30d0\u30fc %s \u3068\u306e\u63a5\u7d9a\u306b\u5931\u6557\u3057\u307e\u3057\u305f |
| | | SEVERE_ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=\u30a4\u30f3\u30dd\u30fc\u30c8\u4e2d\u306b\u9593\u9055\u3063\u305f\u30e1\u30c3\u30bb\u30fc\u30b8 ID \u30b7\u30fc\u30b1\u30f3\u30b9\u304c\u691c\u51fa\u3055\u308c\u307e\u3057\u305f\u3002\u4e88\u60f3: %s \u5b9f\u969b: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\u6b21\u306e\u30b5\u30fc\u30d0\u30fc\u306f\u4e88\u60f3\u6642\u9593\u5185\u306b\u521d\u671f\u5316\u3092\u80af\u5b9a\u5fdc\u7b54\u3057\u307e\u305b\u3093\u3067\u3057\u305f\u3002\u305d\u308c\u3089\u306e\u30b5\u30fc\u30d0\u30fc\u306f\u30c0\u30a6\u30f3\u3057\u3066\u3044\u308b\u304b\u3001\u9045\u3059\u304e\u308b\u53ef\u80fd\u6027\u304c\u3042\u308a\u307e\u3059\u3002\u30b5\u30fc\u30d0\u30fc\u306e\u30ea\u30b9\u30c8: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\u6b21\u306e\u30b5\u30fc\u30d0\u30fc\u306f\u6b63\u3057\u3044\u4e16\u4ee3 (%s) \u3068\u7d50\u3073\u4ed8\u3044\u3066\u3044\u308b\u521d\u671f\u5316\u3092\u7d42\u4e86\u3057\u307e\u305b\u3093\u3067\u3057\u305f\u3002\u305d\u308c\u3089\u306e\u30b5\u30fc\u30d0\u30fc\u306f\u505c\u6b62\u3057\u3066\u3044\u308b\u304b\u3001\u9045\u3059\u304e\u308b\u53ef\u80fd\u6027\u304c\u3042\u308a\u307e\u3059\u3002\u30b5\u30fc\u30d0\u30fc\u306e\u30ea\u30b9\u30c8: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=\u30ea\u30e2\u30fc\u30c8\u30b5\u30fc\u30d0\u30fc\u306e\u521d\u671f\u5316\u6642\u306b\u3001serverId=%s \u306e\u30ec\u30d7\u30ea\u30b1\u30fc\u30b7\u30e7\u30f3\u30b5\u30fc\u30d0\u30fc\u3078\u306e\u63a5\u7d9a\u304c\u5931\u308f\u308c\u307e\u3059 |
| | | SEVERE_ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=\u30ea\u30e2\u30fc\u30c8\u30b5\u30fc\u30d0\u30fc\u306e\u521d\u671f\u5316\u6642\u306b\u3001serverId=%s \u306e\u521d\u671f\u5316\u3055\u308c\u305f\u30b5\u30fc\u30d0\u30fc\u304c\u505c\u6b62\u3057\u3066\u3044\u308b\u304b\u3001\u9045\u3059\u304e\u308b\u53ef\u80fd\u6027\u304c\u3042\u308a\u307e\u3059 |
| | |
| | | SEVERE_ERR_INIT_IMPORT_FAILURE_190=\u5728\u4ece\u8fdc\u7a0b\u670d\u52a1\u5668\u8fdb\u884c\u521d\u59cb\u5316\u671f\u95f4\uff0c\u53d1\u751f\u4ee5\u4e0b\u9519\u8bef: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\u5728\u5bfc\u5165\u671f\u95f4\u4e0e\u590d\u5236\u670d\u52a1\u5668 %s \u8fde\u63a5\u5931\u8d25 |
| | | SEVERE_ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=\u5bfc\u5165\u671f\u95f4\u6d88\u606f ID \u9519\u8bef\u3002\u671f\u671b: %s \u5b9e\u9645: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\u4ee5\u4e0b\u670d\u52a1\u5668\u672a\u5728\u9884\u671f\u7684\u65f6\u95f4\u5185\u786e\u8ba4\u521d\u59cb\u5316\u3002\u53ef\u80fd\u5df2\u5173\u95ed\u6216\u592a\u6162\u3002\u670d\u52a1\u5668\u5217\u8868: %s |
| | | SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\u4ee5\u4e0b\u670d\u52a1\u5668\u5c1a\u672a\u7ed3\u675f\u4ee5\u6b64\u6b63\u786e\u7684\u751f\u6210 (%s)\u6765\u8fde\u63a5\u7684\u521d\u59cb\u5316\u3002\u53ef\u80fd\u5df2\u505c\u6b62\u6216\u592a\u6162\u3002\u670d\u52a1\u5668\u5217\u8868: %s |
| | | SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=\u521d\u59cb\u5316\u8fdc\u7a0b\u670d\u52a1\u5668\u65f6\uff0c\u4e0e\u670d\u52a1\u5668 ID \u4e3a %s \u7684\u590d\u5236\u670d\u52a1\u5668\u7684\u8fde\u63a5\u4e22\u5931 |
| | | SEVERE_ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=\u521d\u59cb\u5316\u8fdc\u7a0b\u670d\u52a1\u5668\u65f6\uff0c\u670d\u52a1\u5668 ID \u4e3a %s \u7684\u5df2\u521d\u59cb\u5316\u670d\u52a1\u5668\u53ef\u80fd\u5df2\u505c\u6b62\u6216\u592a\u6162 |
| | |
| | | && equals(eclIncludesForDeletes, dsInfo.getEclIncludesForDeletes()); |
| | | } |
| | | |
| | | private boolean equals(Set<String> o1, Set<String> o2) |
| | | private boolean equals(Object o1, Object o2) |
| | | { |
| | | return (o1 == null && o2 == null) || (o1 != null && o1.equals(o2)); |
| | | return o1 == null ? o2 == null : o1.equals(o2); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder sb = new StringBuilder(); |
| | | final StringBuilder sb = new StringBuilder(); |
| | | sb.append("DS id: ").append(dsId); |
| | | sb.append(" ; DS url: ").append(dsUrl); |
| | | sb.append(" ; RS id: ").append(rsId); |
| | | sb.append(" ; Generation id: ").append(generationId); |
| | | sb.append(" ; Status: ").append(status); |
| | | sb.append(" ; Assured replication: ").append(assuredFlag); |
| | | sb.append(" ; Assured mode: ").append(assuredMode); |
| | | sb.append(" ; Safe data level: ").append(safeDataLevel); |
| | | if (assuredFlag) |
| | | { |
| | | sb.append(" ; Assured mode: ").append(assuredMode); |
| | | sb.append(" ; Safe data level: ").append(safeDataLevel); |
| | | } |
| | | sb.append(" ; Group id: ").append(groupId); |
| | | sb.append(" ; Protocol version: ").append(protocolVersion); |
| | | sb.append(" ; Referral URLs: ").append(refUrls); |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2012-2013 ForgeRock AS |
| | | * Portions Copyright 2012-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | |
| | | public final class RSInfo |
| | | { |
| | | /** Server id of the RS. */ |
| | | private final int id; |
| | | private final int rsServerId; |
| | | /** Generation Id of the RS. */ |
| | | private final long generationId; |
| | | /** Group id of the RS. */ |
| | |
| | | */ |
| | | private final int weight; |
| | | /** The server URL of the RS. */ |
| | | private final String serverUrl; |
| | | private final String rsServerURL; |
| | | |
| | | /** |
| | | * Creates a new instance of RSInfo with every given info. |
| | | * |
| | | * @param id The RS id |
| | | * @param serverUrl Url of the RS |
| | | * @param rsServerId The RS id |
| | | * @param rsServerURL Url of the RS |
| | | * @param generationId The generation id the RS is using |
| | | * @param groupId RS group id |
| | | * @param weight RS weight |
| | | */ |
| | | public RSInfo(int id, String serverUrl, |
| | | public RSInfo(int rsServerId, String rsServerURL, |
| | | long generationId, byte groupId, int weight) |
| | | { |
| | | this.id = id; |
| | | this.serverUrl = serverUrl; |
| | | this.rsServerId = rsServerId; |
| | | this.rsServerURL = rsServerURL; |
| | | this.generationId = generationId; |
| | | this.groupId = groupId; |
| | | this.weight = weight; |
| | |
| | | */ |
| | | public int getId() |
| | | { |
| | | return id; |
| | | return rsServerId; |
| | | } |
| | | |
| | | /** |
| | |
| | | return false; |
| | | } |
| | | final RSInfo rsInfo = (RSInfo) obj; |
| | | return id == rsInfo.getId() |
| | | return rsServerId == rsInfo.getId() |
| | | && generationId == rsInfo.getGenerationId() |
| | | && groupId == rsInfo.getGroupId() |
| | | && weight == rsInfo.getWeight() |
| | | && ((serverUrl == null && rsInfo.getServerUrl() == null) |
| | | || (serverUrl != null && serverUrl.equals(rsInfo.getServerUrl()))); |
| | | && weight == rsInfo.getWeight(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public int hashCode() |
| | | { |
| | | int hash = 7; |
| | | hash = 17 * hash + this.id; |
| | | hash = 17 * hash + this.rsServerId; |
| | | hash = 17 * hash + (int) (this.generationId ^ (this.generationId >>> 32)); |
| | | hash = 17 * hash + this.groupId; |
| | | hash = 17 * hash + this.weight; |
| | | hash = 17 * hash + (this.serverUrl != null ? this.serverUrl.hashCode() : 0); |
| | | return hash; |
| | | } |
| | | |
| | |
| | | */ |
| | | public String getServerUrl() |
| | | { |
| | | return serverUrl; |
| | | return rsServerURL; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("Id: ").append(id); |
| | | sb.append(" ; Server URL: ").append(serverUrl); |
| | | sb.append(" ; Generation id: ").append(generationId); |
| | | sb.append(" ; Group id: ").append(groupId); |
| | | sb.append(" ; Weight: ").append(weight); |
| | | return sb.toString(); |
| | | return "RS id: " + rsServerId |
| | | + " ; RS URL: " + rsServerURL |
| | | + " ; Generation id: " + generationId |
| | | + " ; Group id: " + groupId |
| | | + " ; Weight: " + weight; |
| | | } |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2007-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | |
| | | /** |
| | | * |
| | | * This class defines a message that is sent: |
| | | * - By a RS to the other RSs in the topology, containing: |
| | | * - the list of DSs directly connected to the RS in the DS list |
| | | * - only this RS in the RS list |
| | | * - the DSs directly connected to the RS in the DS infos |
| | | * - only this RS in the RS infos |
| | | * - By a RS to his connected DSs, containing every DSs and RSs he knows. |
| | | * In that case the message contains: |
| | | * - the list of every DS the RS knows except the destinator DS in the DS list |
| | | * - the list of every connected RSs (including the sending RS) in the RS list |
| | | * - every DSs the RS knows except the destinator DS in the DS infos |
| | | * - every connected RSs (including the sending RS) in the RS infos |
| | | * |
| | | * Exchanging these messages allows to have each RS or DS take |
| | | * appropriate decisions according to the current topology: |
| | |
| | | */ |
| | | public class TopologyMsg extends ReplicationMsg |
| | | { |
| | | // Information for the DS known in the topology |
| | | private final List<DSInfo> dsList; |
| | | // Information for the RS known in the topology |
| | | private final List<RSInfo> rsList; |
| | | /** Information for the DSs (aka replicas) known in the topology. */ |
| | | private final Map<Integer, DSInfo> replicaInfos; |
| | | /** Information for the RSs known in the topology. */ |
| | | private final List<RSInfo> rsInfos; |
| | | |
| | | /** |
| | | * Creates a new changelogInfo message from its encoded form. |
| | |
| | | if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY) |
| | | { |
| | | throw new DataFormatException( |
| | | "Input is not a valid " + this.getClass().getCanonicalName()); |
| | | "Input is not a valid " + getClass().getCanonicalName()); |
| | | } |
| | | |
| | | int pos = 1; |
| | | |
| | | /* Read number of following DS info entries */ |
| | | |
| | | byte nDsInfo = in[pos++]; |
| | | |
| | | /* Read the DS info entries */ |
| | | List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo)); |
| | | while ( (nDsInfo > 0) && (pos < in.length) ) |
| | | Map<Integer, DSInfo> replicaInfos = |
| | | new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo)); |
| | | while (nDsInfo > 0 && pos < in.length) |
| | | { |
| | | /* Read DS id */ |
| | | int length = getNextLength(in, pos); |
| | |
| | | } |
| | | |
| | | /* Read RS id */ |
| | | length = |
| | | getNextLength(in, pos); |
| | | serverIdString = |
| | | new String(in, pos, length, "UTF-8"); |
| | | length = getNextLength(in, pos); |
| | | serverIdString = new String(in, pos, length, "UTF-8"); |
| | | int rsId = Integer.valueOf(serverIdString); |
| | | pos += length + 1; |
| | | |
| | | /* Read the generation id */ |
| | | length = getNextLength(in, pos); |
| | | long generationId = |
| | | Long.valueOf(new String(in, pos, length, |
| | | "UTF-8")); |
| | | long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* Read DS status */ |
| | | ServerStatus status = ServerStatus.valueOf(in[pos++]); |
| | | |
| | | /* Read DS assured flag */ |
| | | boolean assuredFlag; |
| | | assuredFlag = in[pos++] == 1; |
| | | boolean assuredFlag = in[pos++] == 1; |
| | | |
| | | /* Read DS assured mode */ |
| | | AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]); |
| | |
| | | |
| | | /* Read number of referrals URLs */ |
| | | List<String> refUrls = new ArrayList<String>(); |
| | | byte nUrls = in[pos++]; |
| | | byte nRead = 0; |
| | | /* Read urls until expected number read */ |
| | | while ((nRead != nUrls) && |
| | | (pos < in.length) //security |
| | | ) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | String url = new String(in, pos, length, "UTF-8"); |
| | | refUrls.add(url); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | pos = readStrings(in, pos, refUrls); |
| | | |
| | | Set<String> attrs = new HashSet<String>(); |
| | | Set<String> delattrs = new HashSet<String>(); |
| | | short protocolVersion = -1; |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | byte nAttrs = in[pos++]; |
| | | nRead = 0; |
| | | /* Read attrs until expected number read */ |
| | | while ((nRead != nAttrs) && (pos < in.length)) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | String attr = new String(in, pos, length, "UTF-8"); |
| | | attrs.add(attr); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | pos = readStrings(in, pos, attrs); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5) |
| | | { |
| | | nAttrs = in[pos++]; |
| | | nRead = 0; |
| | | /* Read attrs until expected number read */ |
| | | while ((nRead != nAttrs) && (pos < in.length)) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | String attr = new String(in, pos, length, "UTF-8"); |
| | | delattrs.add(attr); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | pos = readStrings(in, pos, delattrs); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | |
| | | /* Read Protocol version */ |
| | | protocolVersion = (short)in[pos++]; |
| | | protocolVersion = in[pos++]; |
| | | } |
| | | |
| | | /* Now create DSInfo and store it in list */ |
| | | |
| | | DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status, |
| | | assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs, |
| | | delattrs, protocolVersion); |
| | | dsList.add(dsInfo); |
| | | /* Now create DSInfo and store it */ |
| | | replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId, |
| | | status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, |
| | | attrs, delattrs, protocolVersion)); |
| | | |
| | | nDsInfo--; |
| | | } |
| | | |
| | | /* Read number of following RS info entries */ |
| | | |
| | | byte nRsInfo = in[pos++]; |
| | | |
| | | /* Read the RS info entries */ |
| | | List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); |
| | | while ( (nRsInfo > 0) && (pos < in.length) ) |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); |
| | | while (nRsInfo > 0 && pos < in.length) |
| | | { |
| | | /* Read RS id */ |
| | | int length = getNextLength(in, pos); |
| | |
| | | |
| | | /* Read the generation id */ |
| | | length = getNextLength(in, pos); |
| | | long generationId = |
| | | Long.valueOf(new String(in, pos, length, |
| | | "UTF-8")); |
| | | long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* Read RS group id */ |
| | |
| | | pos += length + 1; |
| | | } |
| | | |
| | | /* Now create RSInfo and store it in list */ |
| | | |
| | | RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId, |
| | | weight); |
| | | rsList.add(rsInfo); |
| | | /* Now create RSInfo and store it */ |
| | | rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight)); |
| | | |
| | | nRsInfo--; |
| | | } |
| | | |
| | | this.dsList = Collections.unmodifiableList(dsList); |
| | | this.rsList = Collections.unmodifiableList(rsList); |
| | | } catch (UnsupportedEncodingException e) |
| | | this.replicaInfos = Collections.unmodifiableMap(replicaInfos); |
| | | this.rsInfos = Collections.unmodifiableList(rsInfos); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new message from a list of the currently connected servers. |
| | | * |
| | | * @param dsList The list of currently connected DS servers ID. |
| | | * @param rsList The list of currently connected RS servers ID. |
| | | */ |
| | | public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList) |
| | | private int readStrings(byte[] in, int pos, Collection<String> outputCol) |
| | | throws DataFormatException, UnsupportedEncodingException |
| | | { |
| | | if (dsList == null || dsList.isEmpty()) |
| | | byte nAttrs = in[pos++]; |
| | | byte nRead = 0; |
| | | // Read all elements until expected number read |
| | | while (nRead != nAttrs && pos < in.length) |
| | | { |
| | | this.dsList = Collections.emptyList(); |
| | | int length = getNextLength(in, pos); |
| | | outputCol.add(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | return pos; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new message of the currently connected servers. |
| | | * |
| | | * @param dsInfos The collection of currently connected DS servers ID. |
| | | * @param rsInfos The list of currently connected RS servers ID. |
| | | */ |
| | | public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos) |
| | | { |
| | | if (dsInfos == null || dsInfos.isEmpty()) |
| | | { |
| | | this.replicaInfos = Collections.emptyMap(); |
| | | } |
| | | else |
| | | { |
| | | this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList)); |
| | | Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>(); |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | replicas.put(dsInfo.getDsId(), dsInfo); |
| | | } |
| | | this.replicaInfos = Collections.unmodifiableMap(replicas); |
| | | } |
| | | |
| | | if (rsList == null || rsList.isEmpty()) |
| | | if (rsInfos == null || rsInfos.isEmpty()) |
| | | { |
| | | this.rsList = Collections.emptyList(); |
| | | this.rsInfos = Collections.emptyList(); |
| | | } |
| | | else |
| | | { |
| | | this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList)); |
| | | this.rsInfos = |
| | | Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos)); |
| | | } |
| | | } |
| | | |
| | |
| | | // Msg encoding |
| | | // ============ |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short version) |
| | | throws UnsupportedEncodingException |
| | | public byte[] getBytes(short version) throws UnsupportedEncodingException |
| | | { |
| | | try |
| | | { |
| | |
| | | oStream.write(MSG_TYPE_TOPOLOGY); |
| | | |
| | | // Put number of following DS info entries |
| | | oStream.write((byte)dsList.size()); |
| | | oStream.write((byte) replicaInfos.size()); |
| | | |
| | | // Put DS info |
| | | for (DSInfo dsInfo : dsList) |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | { |
| | | // Put DS id |
| | | byte[] byteServerId = |
| | |
| | | oStream.write(0); |
| | | } |
| | | // Put RS id |
| | | byteServerId = |
| | | String.valueOf(dsInfo.getRsId()).getBytes("UTF-8"); |
| | | byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8"); |
| | | oStream.write(byteServerId); |
| | | oStream.write(0); |
| | | // Put the generation id |
| | |
| | | // Put DS group id |
| | | oStream.write(dsInfo.getGroupId()); |
| | | |
| | | List<String> refUrls = dsInfo.getRefUrls(); |
| | | // Put number of following URLs as a byte |
| | | oStream.write(refUrls.size()); |
| | | for (String url : refUrls) |
| | | { |
| | | // Write the url and a 0 terminating byte |
| | | oStream.write(url.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | writeStrings(oStream, dsInfo.getRefUrls()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // Put ECL includes |
| | | Set<String> attrs = dsInfo.getEclIncludes(); |
| | | oStream.write(attrs.size()); |
| | | for (String attr : attrs) |
| | | { |
| | | oStream.write(attr.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | writeStrings(oStream, dsInfo.getEclIncludes()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5) |
| | | { |
| | | Set<String> delattrs = dsInfo.getEclIncludesForDeletes(); |
| | | oStream.write(delattrs.size()); |
| | | for (String attr : delattrs) |
| | | { |
| | | oStream.write(attr.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | writeStrings(oStream, dsInfo.getEclIncludesForDeletes()); |
| | | } |
| | | |
| | | oStream.write(dsInfo.getProtocolVersion()); |
| | |
| | | } |
| | | |
| | | // Put number of following RS info entries |
| | | oStream.write((byte)rsList.size()); |
| | | oStream.write((byte) rsInfos.size()); |
| | | |
| | | // Put RS info |
| | | for (RSInfo rsInfo : rsList) |
| | | for (RSInfo rsInfo : rsInfos) |
| | | { |
| | | // Put RS id |
| | | byte[] byteServerId = |
| | |
| | | // never happens |
| | | throw new RuntimeException(e); |
| | | } |
| | | |
| | | } |
| | | |
| | | private void writeStrings(ByteArrayOutputStream oStream, |
| | | Collection<String> col) throws IOException, UnsupportedEncodingException |
| | | { |
| | | // Put collection length as a byte |
| | | oStream.write(col.size()); |
| | | for (String elem : col) |
| | | { |
| | | // Write the element and a 0 terminating byte |
| | | oStream.write(elem.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String dsStr = ""; |
| | | for (DSInfo dsInfo : dsList) |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | { |
| | | dsStr += dsInfo.toString() + "\n----------------------------\n"; |
| | | dsStr += dsInfo + "\n----------------------------\n"; |
| | | } |
| | | |
| | | String rsStr = ""; |
| | | for (RSInfo rsInfo : rsList) |
| | | for (RSInfo rsInfo : rsInfos) |
| | | { |
| | | rsStr += rsInfo.toString() + "\n----------------------------\n"; |
| | | rsStr += rsInfo + "\n----------------------------\n"; |
| | | } |
| | | |
| | | return ("TopologyMsg content: " |
| | | return "TopologyMsg content:" |
| | | + "\n----------------------------" |
| | | + "\nCONNECTED DS SERVERS:" |
| | | + "\n--------------------\n" |
| | | + dsStr |
| | | + "CONNECTED RS SERVERS:" |
| | | + "\n--------------------\n" |
| | | + rsStr + (rsStr.equals("") ? "----------------------------\n" : "")); |
| | | + rsStr |
| | | + (rsStr.equals("") ? "----------------------------\n" : ""); |
| | | } |
| | | |
| | | /** |
| | | * Get the list of DS info. |
| | | * @return The list of DS info |
| | | * Get the DS infos. |
| | | * |
| | | * @return The DS infos |
| | | */ |
| | | public List<DSInfo> getDsList() |
| | | public Map<Integer, DSInfo> getReplicaInfos() |
| | | { |
| | | return dsList; |
| | | return replicaInfos; |
| | | } |
| | | |
| | | /** |
| | | * Get the list of RS info. |
| | | * @return The list of RS info |
| | | * Get the RS infos. |
| | | * |
| | | * @return The RS infos |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | public List<RSInfo> getRsInfos() |
| | | { |
| | | return rsList; |
| | | return rsInfos; |
| | | } |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // List should only contain RS info for sender |
| | | RSInfo rsInfo = inTopoMsg.getRsList().get(0); |
| | | RSInfo rsInfo = inTopoMsg.getRsInfos().get(0); |
| | | weight = rsInfo.getWeight(); |
| | | } |
| | | |
| | |
| | | public void processTopoInfoFromRS(TopologyMsg topoMsg) |
| | | { |
| | | // List should only contain RS info for sender |
| | | final RSInfo rsInfo = topoMsg.getRsList().get(0); |
| | | final RSInfo rsInfo = topoMsg.getRsInfos().get(0); |
| | | generationId = rsInfo.getGenerationId(); |
| | | groupId = rsInfo.getGroupId(); |
| | | weight = rsInfo.getWeight(); |
| | |
| | | clearRemoteLSHandlers(); |
| | | |
| | | // Creates the new structure according to the message received. |
| | | for (DSInfo dsInfo : topoMsg.getDsList()) |
| | | for (DSInfo dsInfo : topoMsg.getReplicaInfos().values()) |
| | | { |
| | | // For each DS connected to the peer RS |
| | | DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId); |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | // @NotNull // for the reference |
| | | private final AtomicReference<ConnectedRS> connectedRS = |
| | | new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS()); |
| | | /** |
| | | * Our replication domain. |
| | | * <p> |
| | | * Can be null for unit test purpose. |
| | | */ |
| | | /** Our replication domain. */ |
| | | private ReplicationDomain domain; |
| | | /** |
| | | * This object is used as a conditional event to be notified about |
| | |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | | /** |
| | | * Info for other DSs. |
| | | * <p> |
| | | * Warning: does not contain info for us (for our server id) |
| | | */ |
| | | private volatile List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | private volatile long generationID; |
| | | /** Contains the last known state of the replication topology. */ |
| | | private final AtomicReference<Topology> topology = |
| | | new AtomicReference<Topology>(new Topology()); |
| | | /** <pre>@GuardedBy("this")</pre>. */ |
| | | private volatile int updateDoneCount = 0; |
| | | private volatile boolean connectRequiresRecovery = false; |
| | | |
| | | /** |
| | | * The map of replication server info initialized at connection time and |
| | | * regularly updated. This is used to decide to which best suitable |
| | | * replication server one wants to connect. Key: replication server id Value: |
| | | * replication server info for the matching replication server id |
| | | */ |
| | | private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos; |
| | | |
| | | /** |
| | | * This integer defines when the best replication server checking algorithm |
| | | * should be engaged. |
| | | * Every time a monitoring message (each monitoring publisher period) is |
| | |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param config The configuration to use. |
| | | * @param generationId The generationId for the server associated to the |
| | | * provided serverId and for the domain associated to the provided baseDN. |
| | | * @param replSessionSecurity The session security configuration. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, ReplicationDomainCfg config, long generationId, |
| | | ServerState state, ReplicationDomainCfg config, |
| | | ReplSessionSecurity replSessionSecurity) |
| | | { |
| | | this.domain = replicationDomain; |
| | | this.state = state; |
| | | this.config = config; |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | this.generationID = generationId; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | this.halfRcvWindow = rcvWindow / 2; |
| | | |
| | |
| | | */ |
| | | private long getGenerationID() |
| | | { |
| | | generationID = domain.getGenerationID(); |
| | | return generationID; |
| | | return domain.getGenerationID(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void setGenerationID(long generationID) |
| | | { |
| | | this.generationID = generationID; |
| | | } |
| | | |
| | | /** |
| | | * Sets the locally configured flag for the passed ReplicationServerInfo |
| | | * object, analyzing the local configuration. |
| | | * @param rsInfo the Replication server to check and update |
| | | */ |
| | | private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo) |
| | | { |
| | | // Determine if the passed ReplicationServerInfo has a URL that is present |
| | | // in the locally configured replication servers |
| | | String rsUrl = rsInfo.getServerURL(); |
| | | if (rsUrl == null) |
| | | { |
| | | // The ReplicationServerInfo has been generated from a server with |
| | | // no URL in TopologyMsg (i.e: with replication protocol version < 4): |
| | | // ignore this server as we do not know how to connect to it |
| | | rsInfo.setLocallyConfigured(false); |
| | | return; |
| | | } |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | if (isSameReplicationServerUrl(serverUrl, rsUrl)) |
| | | { |
| | | // This RS is locally configured, mark this |
| | | rsInfo.setLocallyConfigured(true); |
| | | rsInfo.setServerURL(serverUrl); |
| | | return; |
| | | } |
| | | } |
| | | rsInfo.setLocallyConfigured(false); |
| | | domain.setGenerationID(generationID); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | // Unsupported message type: should not happen |
| | | throw new IllegalArgumentException("Unexpected PDU type: " |
| | | + msg.getClass().getName() + " :\n" + msg); |
| | | + msg.getClass().getName() + ":\n" + msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "Url:" + getServerURL() + " ServerId:" + getServerId() |
| | | + " GroupId:" + getGroupId(); |
| | | return "ReplServerInfo Url:" + getServerURL() |
| | | + " ServerId:" + getServerId() |
| | | + " GroupId:" + getGroupId() |
| | | + " connectedDSs:" + connectedDSs; |
| | | } |
| | | } |
| | | |
| | |
| | | + "elect the preferred one"); |
| | | |
| | | // Get info from every available replication servers |
| | | replicationServerInfos = collectReplicationServersInfo(); |
| | | Map<Integer, ReplicationServerInfo> rsInfos = |
| | | collectReplicationServersInfo(); |
| | | computeNewTopology(toRSInfos(rsInfos)); |
| | | |
| | | if (replicationServerInfos.isEmpty()) |
| | | if (rsInfos.isEmpty()) |
| | | { |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | |
| | | { |
| | | // At least one server answered, find the best one. |
| | | RSEvaluations evals = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | rsInfos, serverId, getGroupId(), getGenerationID()); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | |
| | | Update replication server info with potentially more up to date |
| | | data (server state for instance may have changed) |
| | | */ |
| | | replicationServerInfos |
| | | .put(electedRsInfo.getServerId(), electedRsInfo); |
| | | rsInfos.put(electedRsInfo.getServerId(), electedRsInfo); |
| | | |
| | | // Handshake phase 1 exchange went well |
| | | |
| | |
| | | connectionError = true; |
| | | connectPhaseLock.notify(); |
| | | |
| | | if (replicationServerInfos.size() > 0) |
| | | if (rsInfos.size() > 0) |
| | | { |
| | | logError(WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, baseDN.toNormalizedString(), |
| | | collectionToString(replicationServerInfos.keySet(), ", "))); |
| | | collectionToString(rsInfos.keySet(), ", "))); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void computeNewTopology(List<RSInfo> newRSInfos) |
| | | { |
| | | final int rsServerId = getRsServerId(); |
| | | |
| | | Topology oldTopo; |
| | | Topology newTopo; |
| | | do |
| | | { |
| | | oldTopo = topology.get(); |
| | | newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(), |
| | | rsServerId, getReplicationServerUrls(), oldTopo.rsInfos); |
| | | } |
| | | while (!topology.compareAndSet(oldTopo, newTopo)); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo(topologyChange(rsServerId, oldTopo, newTopo)); |
| | | } |
| | | } |
| | | |
| | | private StringBuilder topologyChange(int rsServerId, Topology oldTopo, |
| | | Topology newTopo) |
| | | { |
| | | final StringBuilder sb = new StringBuilder(); |
| | | sb.append("rsServerId=").append(rsServerId); |
| | | if (newTopo.equals(oldTopo)) |
| | | { |
| | | sb.append(", unchangedTopology=").append(newTopo); |
| | | } |
| | | else |
| | | { |
| | | sb.append(", oldTopology=").append(oldTopo); |
| | | sb.append(", newTopology=").append(newTopo); |
| | | } |
| | | return sb; |
| | | } |
| | | |
| | | /** |
| | | * Connects to a replication server. |
| | | * |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("end restart : connected=" + rs.isConnected() + " with RS(" |
| | | + rs.getServerId() + ") genId=" + generationID); |
| | | + rs.getServerId() + ") genId=" + getGenerationID()); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | credit = |
| | | currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | credit = true; |
| | | } |
| | |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("publish(): IOException caught: " |
| | | + stackTraceToSingleLineString(e)); |
| | | } |
| | | if (!retryOnFailure) |
| | | { |
| | | return false; |
| | |
| | | try |
| | | { |
| | | connectPhaseLock.wait(100); |
| | | } catch (InterruptedException e1) |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // ignore |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("publish(): Interrupted exception raised : " |
| | | + e.getLocalizedMessage()); |
| | | debugInfo("publish(): InterruptedException caught 1: " |
| | | + stackTraceToSingleLineString(ignored)); |
| | | } |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // just loop. |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("publish(): Interrupted exception raised." |
| | | + e.getLocalizedMessage()); |
| | | debugInfo("publish(): InterruptedException caught 2: " |
| | | + stackTraceToSingleLineString(ignored)); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | // Update the replication servers ServerStates with new received info |
| | | Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos; |
| | | for (int srvId : toIterable(monitorMsg.rsIterator())) |
| | | { |
| | | ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId); |
| | | final ReplicationServerInfo rsInfo = rsInfos.get(srvId); |
| | | if (rsInfo != null) |
| | | { |
| | | rsInfo.update(monitorMsg.getRSServerState(srvId)); |
| | |
| | | { |
| | | // Stable topology (no topo msg since few seconds): proceed with |
| | | // best server checking. |
| | | final RSEvaluations evals = |
| | | computeBestReplicationServer(false, previousRsServerID, state, |
| | | replicationServerInfos, serverId, getGroupId(), generationID); |
| | | final RSEvaluations evals = computeBestReplicationServer( |
| | | false, previousRsServerID, state, |
| | | rsInfos, serverId, getGroupId(), getGenerationID()); |
| | | final ReplicationServerInfo bestServerInfo = evals.getBestRS(); |
| | | if (previousRsServerID != -1 |
| | | && (bestServerInfo == null |
| | |
| | | * Gets the info for DSs in the topology (except us). |
| | | * @return The info for DSs in the topology (except us) |
| | | */ |
| | | public List<DSInfo> getDsList() |
| | | public Map<Integer, DSInfo> getReplicaInfos() |
| | | { |
| | | return dsList; |
| | | return topology.get().replicaInfos; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The info for RSs in the topology (except the one we are connected |
| | | * to) |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | public List<RSInfo> getRsInfos() |
| | | { |
| | | return toRSInfos(topology.get().rsInfos); |
| | | } |
| | | |
| | | private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos) |
| | | { |
| | | final List<RSInfo> result = new ArrayList<RSInfo>(); |
| | | for (ReplicationServerInfo rsInfo : replicationServerInfos.values()) |
| | | for (ReplicationServerInfo rsInfo : rsInfos.values()) |
| | | { |
| | | result.add(rsInfo.toRSInfo()); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Computes the list of DSs connected to a particular RS. |
| | | * @param rsId The RS id of the server one wants to know the connected DSs |
| | | * @param dsList The list of DSinfo from which to compute things |
| | | * @param rsServerId the serverId to use for the connectedDS |
| | | * @return The list of connected DSs to the server rsId |
| | | */ |
| | | private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList, |
| | | int rsServerId) |
| | | { |
| | | final Set<Integer> connectedDSs = new HashSet<Integer>(); |
| | | if (rsServerId == rsId) |
| | | { |
| | | /* |
| | | If we are computing connected DSs for the RS we are connected |
| | | to, we should count the local DS as the DSInfo of the local DS is not |
| | | sent by the replication server in the topology message. We must count |
| | | ourselves as a connected server. |
| | | */ |
| | | connectedDSs.add(getServerId()); |
| | | } |
| | | |
| | | for (DSInfo dsInfo : dsList) |
| | | { |
| | | if (dsInfo.getRsId() == rsId) |
| | | { |
| | | connectedDSs.add(dsInfo.getDsId()); |
| | | } |
| | | } |
| | | |
| | | return connectedDSs; |
| | | } |
| | | |
| | | /** |
| | | * Processes an incoming TopologyMsg. |
| | | * Updates the structures for the local view of the topology. |
| | | * |
| | |
| | | */ |
| | | private void receiveTopo(TopologyMsg topoMsg, int rsServerId) |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("receive TopologyMsg=" + topoMsg); |
| | | |
| | | // Store new DS list |
| | | dsList = topoMsg.getDsList(); |
| | | |
| | | // Update replication server info list with the received topology |
| | | // information |
| | | final Set<Integer> rssToKeep = new HashSet<Integer>(); |
| | | for (RSInfo rsInfo : topoMsg.getRsList()) |
| | | final Topology newTopo = computeNewTopology(topoMsg, rsServerId); |
| | | for (DSInfo dsInfo : newTopo.replicaInfos.values()) |
| | | { |
| | | final int rsId = rsInfo.getId(); |
| | | rssToKeep.add(rsId); // Mark this server as still existing |
| | | Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId); |
| | | ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId); |
| | | if (rsInfo2 == null) |
| | | { |
| | | // New replication server, create info for it add it to the list |
| | | rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs); |
| | | setLocallyConfiguredFlag(rsInfo2); |
| | | replicationServerInfos.put(rsId, rsInfo2); |
| | | } |
| | | else |
| | | { |
| | | // Update the existing info for the replication server |
| | | rsInfo2.update(rsInfo, connectedDSs); |
| | | } |
| | | domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo |
| | | .getEclIncludesForDeletes()); |
| | | } |
| | | } |
| | | |
| | | private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId) |
| | | { |
| | | Topology oldTopo; |
| | | Topology newTopo; |
| | | do |
| | | { |
| | | oldTopo = topology.get(); |
| | | newTopo = new Topology(topoMsg, getServerId(), rsServerId, |
| | | getReplicationServerUrls(), oldTopo.rsInfos); |
| | | } |
| | | while (!topology.compareAndSet(oldTopo, newTopo)); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo); |
| | | sb.append(" received TopologyMsg=").append(topoMsg); |
| | | debugInfo(sb); |
| | | } |
| | | return newTopo; |
| | | } |
| | | |
| | | /** |
| | | * Contains the last known state of the replication topology. |
| | | */ |
| | | static final class Topology |
| | | { |
| | | |
| | | /** |
| | | * The RS's serverId that this DS was connected to when this topology state |
| | | * was computed. |
| | | */ |
| | | private final int rsServerId; |
| | | /** |
| | | * Info for other DSs. |
| | | * <p> |
| | | * Warning: does not contain info for us (for our server id) |
| | | */ |
| | | final Map<Integer, DSInfo> replicaInfos; |
| | | /** |
| | | * The map of replication server info initialized at connection time and |
| | | * regularly updated. This is used to decide to which best suitable |
| | | * replication server one wants to connect. Key: replication server id |
| | | * Value: replication server info for the matching replication server id |
| | | */ |
| | | final Map<Integer, ReplicationServerInfo> rsInfos; |
| | | |
| | | private Topology() |
| | | { |
| | | this.rsServerId = -1; |
| | | this.replicaInfos = Collections.emptyMap(); |
| | | this.rsInfos = Collections.emptyMap(); |
| | | } |
| | | |
| | | // Remove any replication server that may have disappeared from the topology |
| | | replicationServerInfos.keySet().retainAll(rssToKeep); |
| | | |
| | | for (DSInfo info : dsList) |
| | | /** |
| | | * Constructor to use when only the RSInfos need to be recomputed. |
| | | * |
| | | * @param dsInfosToKeep |
| | | * the DSInfos that will be stored as is |
| | | * @param newRSInfos |
| | | * the new RSInfos from which to compute the new topology |
| | | * @param dsServerId |
| | | * the DS serverId |
| | | * @param rsServerId |
| | | * the current connected RS serverId |
| | | * @param configuredReplicationServerUrls |
| | | * the configured replication server URLs |
| | | * @param previousRsInfos |
| | | * the RSInfos computed in the previous Topology object |
| | | */ |
| | | Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos, |
| | | int dsServerId, int rsServerId, |
| | | Set<String> configuredReplicationServerUrls, |
| | | Map<Integer, ReplicationServerInfo> previousRsInfos) |
| | | { |
| | | domain.setEclIncludes(info.getDsId(), info.getEclIncludes(), |
| | | info.getEclIncludesForDeletes()); |
| | | this.rsServerId = rsServerId; |
| | | this.replicaInfos = dsInfosToKeep; |
| | | this.rsInfos = computeRSInfos(dsServerId, newRSInfos, |
| | | previousRsInfos, configuredReplicationServerUrls); |
| | | } |
| | | |
| | | /** |
| | | * Constructor to use when a new TopologyMsg has been received. |
| | | * |
| | | * @param topoMsg |
| | | * the topology message containing the new DSInfos and RSInfos from |
| | | * which to compute the new topology |
| | | * @param dsServerId |
| | | * the DS serverId |
| | | * @param rsServerId |
| | | * the current connected RS serverId |
| | | * @param configuredReplicationServerUrls |
| | | * the configured replication server URLs |
| | | * @param previousRsInfos |
| | | * the RSInfos computed in the previous Topology object |
| | | */ |
| | | Topology(TopologyMsg topoMsg, int dsServerId, |
| | | int rsServerId, Set<String> configuredReplicationServerUrls, |
| | | Map<Integer, ReplicationServerInfo> previousRsInfos) |
| | | { |
| | | this.rsServerId = rsServerId; |
| | | this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId); |
| | | this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(), |
| | | previousRsInfos, configuredReplicationServerUrls); |
| | | } |
| | | |
| | | private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos, |
| | | int dsServerId) |
| | | { |
| | | final Map<Integer, DSInfo> copy = new HashMap<Integer, DSInfo>(dsInfos); |
| | | copy.remove(dsServerId); |
| | | return Collections.unmodifiableMap(copy); |
| | | } |
| | | |
| | | private Map<Integer, ReplicationServerInfo> computeRSInfos( |
| | | int dsServerId, List<RSInfo> newRsInfos, |
| | | Map<Integer, ReplicationServerInfo> previousRsInfos, |
| | | Set<String> configuredReplicationServerUrls) |
| | | { |
| | | final Map<Integer, ReplicationServerInfo> results = |
| | | new HashMap<Integer, ReplicationServerInfo>(previousRsInfos); |
| | | |
| | | // Update replication server info list with the received topology info |
| | | final Set<Integer> rssToKeep = new HashSet<Integer>(); |
| | | for (RSInfo newRSInfo : newRsInfos) |
| | | { |
| | | final int rsId = newRSInfo.getId(); |
| | | rssToKeep.add(rsId); // Mark this server as still existing |
| | | Set<Integer> connectedDSs = |
| | | computeDSsConnectedTo(rsId, dsServerId); |
| | | ReplicationServerInfo rsInfo = results.get(rsId); |
| | | if (rsInfo == null) |
| | | { |
| | | // New replication server, create info for it add it to the list |
| | | rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs); |
| | | setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls); |
| | | results.put(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | | // Update the existing info for the replication server |
| | | rsInfo.update(newRSInfo, connectedDSs); |
| | | } |
| | | } |
| | | |
| | | // Remove any replication server that may have disappeared from the |
| | | // topology |
| | | results.keySet().retainAll(rssToKeep); |
| | | |
| | | return Collections.unmodifiableMap(results); |
| | | } |
| | | |
| | | /** Computes the list of DSs connected to a particular RS. */ |
| | | private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId) |
| | | { |
| | | final Set<Integer> connectedDSs = new HashSet<Integer>(); |
| | | if (rsServerId == rsId) |
| | | { |
| | | /* |
| | | * If we are computing connected DSs for the RS we are connected to, we |
| | | * should count the local DS as the DSInfo of the local DS is not sent |
| | | * by the replication server in the topology message. We must count |
| | | * ourselves as a connected server. |
| | | */ |
| | | connectedDSs.add(dsServerId); |
| | | } |
| | | |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | { |
| | | if (dsInfo.getRsId() == rsId) |
| | | { |
| | | connectedDSs.add(dsInfo.getDsId()); |
| | | } |
| | | } |
| | | |
| | | return connectedDSs; |
| | | } |
| | | |
| | | /** |
| | | * Sets the locally configured flag for the passed ReplicationServerInfo |
| | | * object, analyzing the local configuration. |
| | | * |
| | | * @param rsInfo |
| | | * the Replication server to check and update |
| | | * @param configuredReplicationServerUrls |
| | | */ |
| | | private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo, |
| | | Set<String> configuredReplicationServerUrls) |
| | | { |
| | | // Determine if the passed ReplicationServerInfo has a URL that is present |
| | | // in the locally configured replication servers |
| | | String rsUrl = rsInfo.getServerURL(); |
| | | if (rsUrl == null) |
| | | { |
| | | // The ReplicationServerInfo has been generated from a server with |
| | | // no URL in TopologyMsg (i.e: with replication protocol version < 4): |
| | | // ignore this server as we do not know how to connect to it |
| | | rsInfo.setLocallyConfigured(false); |
| | | return; |
| | | } |
| | | for (String serverUrl : configuredReplicationServerUrls) |
| | | { |
| | | if (isSameReplicationServerUrl(serverUrl, rsUrl)) |
| | | { |
| | | // This RS is locally configured, mark this |
| | | rsInfo.setLocallyConfigured(true); |
| | | rsInfo.setServerURL(serverUrl); |
| | | return; |
| | | } |
| | | } |
| | | rsInfo.setLocallyConfigured(false); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean equals(Object obj) |
| | | { |
| | | if (this == obj) |
| | | { |
| | | return true; |
| | | } |
| | | if (obj == null || getClass() != obj.getClass()) |
| | | { |
| | | return false; |
| | | } |
| | | final Topology other = (Topology) obj; |
| | | return rsServerId == other.rsServerId |
| | | && equals(replicaInfos, other.replicaInfos) |
| | | && equals(rsInfos, other.rsInfos) |
| | | && urlsEqual1(replicaInfos, other.replicaInfos) |
| | | && urlsEqual2(rsInfos, other.rsInfos); |
| | | } |
| | | |
| | | private boolean equals(Object o1, Object o2) |
| | | { |
| | | return o1 == null ? o2 == null : o1.equals(o2); |
| | | } |
| | | |
| | | private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1, |
| | | Map<Integer, DSInfo> replicaInfos2) |
| | | { |
| | | for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet()) |
| | | { |
| | | DSInfo dsInfo = replicaInfos2.get(entry.getKey()); |
| | | if (!equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl())) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1, |
| | | Map<Integer, ReplicationServerInfo> rsInfos2) |
| | | { |
| | | for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet()) |
| | | { |
| | | ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey()); |
| | | if (!equals(entry.getValue().getServerURL(), rsInfo.getServerURL())) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | final int prime = 31; |
| | | int result = 1; |
| | | result = prime * result + rsServerId; |
| | | result = prime * result |
| | | + (replicaInfos == null ? 0 : replicaInfos.hashCode()); |
| | | result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode()); |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos |
| | | + ", rsInfos=" + rsInfos.values(); |
| | | } |
| | | } |
| | | |
| | |
| | | .append(" \"").append(getBaseDN()).append(" ") |
| | | .append(getServerId()).append("\",") |
| | | .append(" groupId=").append(getGroupId()) |
| | | .append(", genId=").append(generationID) |
| | | .append(", genId=").append(getGenerationID()) |
| | | .append(", "); |
| | | connectedRS.get().toString(sb); |
| | | return sb.toString(); |
| | | } |
| | | |
| | | private void debugInfo(String message) |
| | | private void debugInfo(CharSequence message) |
| | | { |
| | | TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN() |
| | | + " and serverId=" + getServerId() + " " + message); |
| | | + " and serverId=" + getServerId() + ": " + message); |
| | | } |
| | | } |
| | |
| | | * <p> |
| | | * Full Initialization of a replica can be triggered by LDAP clients |
| | | * by creating InitializeTasks or InitializeTargetTask. |
| | | * Full initialization can also by triggered from the ReplicationDomain |
| | | * implementation using methods {@link #initializeRemote(int)} |
| | | * or {@link #initializeFromRemote(int)}. |
| | | * Full initialization can also be triggered from the ReplicationDomain |
| | | * implementation using methods {@link #initializeRemote(int, Task)} |
| | | * or {@link #initializeFromRemote(int, Task)}. |
| | | * <p> |
| | | * At shutdown time, the {@link #disableService()} method should be called to |
| | | * cleanly stop the replication service. |
| | | */ |
| | | public abstract class ReplicationDomain |
| | | { |
| | | |
| | | /** |
| | | * Contains all the attributes included for the ECL (External Changelog). |
| | | */ |
| | | // @Immutable |
| | | private final static class ECLIncludes |
| | | { |
| | | |
| | | final Map<Integer, Set<String>> includedAttrsByServer; |
| | | final Set<String> includedAttrsAllServers; |
| | | |
| | | final Map<Integer, Set<String>> includedAttrsForDeletesByServer; |
| | | final Set<String> includedAttrsForDeletesAllServers; |
| | | |
| | | private ECLIncludes( |
| | | Map<Integer, Set<String>> includedAttrsByServer, |
| | | Set<String> includedAttrsAllServers, |
| | | Map<Integer, Set<String>> includedAttrsForDeletesByServer, |
| | | Set<String> includedAttrsForDeletesAllServers) |
| | | { |
| | | this.includedAttrsByServer = includedAttrsByServer; |
| | | this.includedAttrsAllServers = includedAttrsAllServers; |
| | | |
| | | this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer; |
| | | this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers; |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | public ECLIncludes() |
| | | { |
| | | this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP, |
| | | Collections.EMPTY_SET); |
| | | } |
| | | |
| | | /** |
| | | * Add attributes to be included in the ECL. |
| | | * |
| | | * @param serverId |
| | | * Server where these attributes are configured. |
| | | * @param includeAttributes |
| | | * Attributes to be included with all change records, may include |
| | | * wild-cards. |
| | | * @param includeAttributesForDeletes |
| | | * Additional attributes to be included with delete change records, |
| | | * may include wild-cards. |
| | | * @return a new {@link ECLIncludes} object if included attributes have |
| | | * changed, or the current object otherwise. |
| | | */ |
| | | public ECLIncludes addIncludedAttributes(int serverId, |
| | | Set<String> includeAttributes, Set<String> includeAttributesForDeletes) |
| | | { |
| | | boolean configurationChanged = false; |
| | | |
| | | Set<String> s1 = new HashSet<String>(includeAttributes); |
| | | |
| | | // Combine all+delete attributes. |
| | | Set<String> s2 = new HashSet<String>(s1); |
| | | s2.addAll(includeAttributesForDeletes); |
| | | |
| | | Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer; |
| | | if (!s1.equals(this.includedAttrsByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesByServer = new HashMap<Integer, Set<String>>( |
| | | this.includedAttrsByServer); |
| | | eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); |
| | | } |
| | | |
| | | Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | this.includedAttrsForDeletesByServer; |
| | | if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>( |
| | | this.includedAttrsForDeletesByServer); |
| | | eclIncludesForDeletesByServer.put( |
| | | serverId, Collections.unmodifiableSet(s2)); |
| | | } |
| | | |
| | | if (!configurationChanged) |
| | | { |
| | | return this; |
| | | } |
| | | |
| | | // and rebuild the global list to be ready for usage |
| | | Set<String> eclIncludesAllServer = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesByServer.values()) |
| | | { |
| | | eclIncludesAllServer.addAll(attributes); |
| | | } |
| | | |
| | | Set<String> eclIncludesForDeletesAllServer = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesForDeletesByServer.values()) |
| | | { |
| | | eclIncludesForDeletesAllServer.addAll(attributes); |
| | | } |
| | | return new ECLIncludes(eclIncludesByServer, |
| | | Collections.unmodifiableSet(eclIncludesAllServer), |
| | | eclIncludesForDeletesByServer, |
| | | Collections.unmodifiableSet(eclIncludesForDeletesAllServer)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Current status for this replicated domain. |
| | | */ |
| | |
| | | */ |
| | | private final CSNGenerator generator; |
| | | |
| | | private final Object eclIncludesLock = new Object(); |
| | | private final Map<Integer, Set<String>> eclIncludesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesAllServers = Collections.emptySet(); |
| | | |
| | | private final Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet(); |
| | | private final AtomicReference<ECLIncludes> eclIncludes = |
| | | new AtomicReference<ECLIncludes>(new ECLIncludes()); |
| | | |
| | | /** |
| | | * An object used to protect the initialization of the underlying broker |
| | |
| | | * Gets the info for Replicas in the topology (except us). |
| | | * @return The info for Replicas in the topology (except us) |
| | | */ |
| | | public List<DSInfo> getReplicasList() |
| | | public Map<Integer, DSInfo> getReplicaInfos() |
| | | { |
| | | return broker.getDsList(); |
| | | return broker.getReplicaInfos(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * disconnected. Return null when no server with the provided serverId is |
| | | * connected. |
| | | * |
| | | * @param serverId The provided serverId of the remote replica |
| | | * @param dsId The provided serverId of the remote replica |
| | | * @return the info related to this remote server if it is connected, |
| | | * null is the server is NOT connected. |
| | | */ |
| | | public DSInfo isRemoteDSConnected(int serverId) |
| | | private DSInfo isRemoteDSConnected(int dsId) |
| | | { |
| | | for (DSInfo remoteDS : getReplicasList()) |
| | | { |
| | | if (remoteDS.getDsId() == serverId) |
| | | { |
| | | return remoteDS; |
| | | } |
| | | } |
| | | return null; |
| | | return getReplicaInfos().get(dsId); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The info for RSs in the topology (except the one we are connected |
| | | * to) |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | public List<RSInfo> getRsInfos() |
| | | { |
| | | return broker.getRsList(); |
| | | return broker.getRsInfos(); |
| | | } |
| | | |
| | | |
| | |
| | | * for and import, false if the IEContext |
| | | * will be used for and export. |
| | | */ |
| | | public IEContext(boolean importInProgress) |
| | | private IEContext(boolean importInProgress) |
| | | { |
| | | this.importInProgress = importInProgress; |
| | | this.startTime = System.currentTimeMillis(); |
| | |
| | | * @return A boolean indicating if a total update import is currently in |
| | | * Progress. |
| | | */ |
| | | public boolean importInProgress() |
| | | boolean importInProgress() |
| | | { |
| | | return importInProgress; |
| | | } |
| | |
| | | entryCount = total; |
| | | entryLeftCount = total; |
| | | |
| | | if (initializeTask != null) |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | ((InitializeTask)initializeTask).setTotal(entryCount); |
| | | ((InitializeTask)initializeTask).setLeft(entryCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | ((InitializeTargetTask)initializeTask).setTotal(entryCount); |
| | | ((InitializeTargetTask)initializeTask).setLeft(entryCount); |
| | | } |
| | | final InitializeTask task = (InitializeTask) initializeTask; |
| | | task.setTotal(entryCount); |
| | | task.setLeft(entryCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | final InitializeTargetTask task = (InitializeTargetTask) initializeTask; |
| | | task.setTotal(entryCount); |
| | | task.setLeft(entryCount); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * @throws DirectoryException if an error occurred. |
| | | */ |
| | | public void updateCounters(int entriesDone) throws DirectoryException |
| | | private void updateCounters(int entriesDone) throws DirectoryException |
| | | { |
| | | entryLeftCount -= entriesDone; |
| | | |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "[ Entry count=" + this.entryCount + |
| | | return "[Entry count=" + this.entryCount + |
| | | ", Entry left count=" + this.entryLeftCount + "]"; |
| | | } |
| | | |
| | |
| | | * @param serverId serverId of the acknowledger/receiver/importer server. |
| | | * @param numAck id of the message received. |
| | | */ |
| | | public void setAckVal(int serverId, int numAck) |
| | | private void setAckVal(int serverId, int numAck) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck); |
| | |
| | | if (target >= 0) |
| | | { |
| | | // FIXME Could we check now that it is a know server in the domain ? |
| | | // JNR: Yes please |
| | | } |
| | | return target; |
| | | } |
| | |
| | | * |
| | | * @param target The server-id of the server that should be initialized. |
| | | * The target can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * {@link #getReplicaInfos()} method. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( |
| | | countEntries(), getBaseDNString(), getServerId())); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | ieCtx.startList.add(dsi.getDsId()); |
| | | } |
| | | ieCtx.startList.addAll(getReplicaInfos().keySet()); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | |
| | | ieCtx.startList.add(serverToInitialize); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (dsi.getDsId() == serverToInitialize && |
| | | dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | |
| | | { |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get( |
| | | ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(), |
| | | ieCtx.failureList.toString())); |
| | | } |
| | | |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName() |
| | | + " export ends with " + " connected=" + broker.isConnected() |
| | | + " export ends with connected=" + broker.isConnected() |
| | | + " exportRootException=" + exportRootException); |
| | | |
| | | if (exportRootException != null) |
| | |
| | | do |
| | | { |
| | | done = true; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | considered in the processing of sorting the successfully initialized |
| | | and the others |
| | | */ |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | replicasWeAreWaitingFor.add(dsi.getDsId()); |
| | | } |
| | | replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet()); |
| | | |
| | | boolean done; |
| | | do |
| | |
| | | done = false; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | if (dsInfo.getGenerationId() == getGenerationID()) |
| | | { // and with the expected generationId |
| | | // We're done with this server |
| | | it.remove(); |
| | | } |
| | | |
| | | if (dsInfo.getGenerationId() == getGenerationID()) |
| | | { // and with the expected generationId |
| | | // We're done with this server |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | |
| | | Thread.currentThread().interrupt(); |
| | | } // 1sec |
| | | } |
| | | |
| | | } |
| | | while (!done && !broker.shuttingDown()); // infinite wait |
| | | |
| | |
| | | * |
| | | * @throws IOException when an error occurred. |
| | | */ |
| | | public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) |
| | | throws IOException |
| | | void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + |
| | |
| | | } |
| | | |
| | | /** |
| | | * Initializes this domain from another source server. |
| | | * <p> |
| | | * When this method is called, a request for initialization will |
| | | * be sent to the source server asking for initialization. |
| | | * <p> |
| | | * The {@code exportBackend(OutputStream)} will therefore be called |
| | | * on the source server, and the {@code importBackend(InputStream)} |
| | | * will be called on his server. |
| | | * <p> |
| | | * The InputStream and OutpuStream given as a parameter to those |
| | | * methods will be connected through the replication protocol. |
| | | * |
| | | * @param source The server-id of the source from which to initialize. |
| | | * The source can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeFromRemote(int source) throws DirectoryException |
| | | { |
| | | initializeFromRemote(source, null); |
| | | } |
| | | |
| | | /** |
| | | * Initializes a remote server from this server. |
| | | * <p> |
| | | * The {@code exportBackend(OutputStream)} will therefore be called |
| | | * on this server, and the {@code importBackend(InputStream)} |
| | | * will be called on the remote server. |
| | | * <p> |
| | | * The InputStream and OutpuStream given as a parameter to those |
| | | * methods will be connected through the replication protocol. |
| | | * |
| | | * @param target The server-id of the server that should be initialized. |
| | | * The target can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeRemote(int target) throws DirectoryException |
| | | { |
| | | initializeRemote(target, null); |
| | | } |
| | | |
| | | /** |
| | | * Initializes asynchronously this domain from a remote source server. |
| | | * Before returning from this call, for the provided task : |
| | | * - the progressing counters are updated during the initialization using |
| | |
| | | * |
| | | * @param source The server-id of the source from which to initialize. |
| | | * The source can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * {@link #getReplicaInfos()} method. |
| | | * |
| | | * @param initTask The task that launched the initialization |
| | | * and should be updated of its progress. |
| | |
| | | * task has initially been created (this server, |
| | | * or the remote server). |
| | | */ |
| | | void initialize(InitializeTargetMsg initTargetMsgReceived, |
| | | private void initialize(InitializeTargetMsg initTargetMsgReceived, |
| | | int requesterServerId) |
| | | { |
| | | InitializeTask initFromTask = null; |
| | |
| | | ieCtx.importSource = source; |
| | | ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount()); |
| | | ieCtx.initWindow = initTargetMsgReceived.getInitWindow(); |
| | | // Protocol version is -1 when not known. |
| | | ieCtx.exporterProtocolVersion = getProtocolVersion(source); |
| | | initFromTask = (InitializeTask) ieCtx.initializeTask; |
| | | |
| | |
| | | * @param dsServerId The provided serverId. |
| | | * @return The protocol version. |
| | | */ |
| | | short getProtocolVersion(int dsServerId) |
| | | private short getProtocolVersion(int dsServerId) |
| | | { |
| | | short protocolVersion = -1; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | final DSInfo dsInfo = getReplicaInfos().get(dsServerId); |
| | | if (dsInfo != null) |
| | | { |
| | | if (dsi.getDsId() == dsServerId) |
| | | { |
| | | protocolVersion = dsi.getProtocolVersion(); |
| | | break; |
| | | } |
| | | return dsInfo.getProtocolVersion(); |
| | | } |
| | | return protocolVersion; |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | |
| | | for (int i = 0; i< 50; i++) |
| | | { |
| | | allSet = true; |
| | | for (RSInfo rsInfo : getRsList()) |
| | | for (RSInfo rsInfo : getRsInfos()) |
| | | { |
| | | // the 'empty' RSes (generationId==-1) are considered as good citizens |
| | | if (rsInfo.getGenerationId() != -1 && |
| | |
| | | * connected to a Replication Server or it |
| | | * was not possible to contact it. |
| | | */ |
| | | public void resetReplicationLog() throws DirectoryException |
| | | void resetReplicationLog() throws DirectoryException |
| | | { |
| | | // Reset the Generation ID to -1 to clean the ReplicationServers. |
| | | resetGenerationId(-1L); |
| | |
| | | { |
| | | // create the broker object used to publish and receive changes |
| | | broker = new ReplicationBroker( |
| | | this, state, config, getGenerationID(), new ReplSessionSecurity()); |
| | | this, state, config, new ReplSessionSecurity()); |
| | | broker.start(); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Applies a configuration change to the attributes which should be be |
| | | * included in the ECL. |
| | | * Applies a configuration change to the attributes which should be included |
| | | * in the ECL. |
| | | * |
| | | * @param includeAttributes |
| | | * attributes to be included with all change records. |
| | |
| | | * @param msg The byte array containing the information that should |
| | | * be sent to the remote entities. |
| | | */ |
| | | public void publish(byte[] msg) |
| | | void publish(byte[] msg) |
| | | { |
| | | UpdateMsg update; |
| | | synchronized (this) |
| | |
| | | Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | boolean configurationChanged = false; |
| | | |
| | | synchronized (eclIncludesLock) |
| | | ECLIncludes current; |
| | | ECLIncludes updated; |
| | | do |
| | | { |
| | | Set<String> s1 = new HashSet<String>(includeAttributes); |
| | | |
| | | // Combine all+delete attributes. |
| | | Set<String> s2 = new HashSet<String>(s1); |
| | | s2.addAll(includeAttributesForDeletes); |
| | | |
| | | if (!s1.equals(eclIncludesByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); |
| | | } |
| | | |
| | | if (!s2.equals(eclIncludesForDeletesByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesForDeletesByServer.put(serverId, |
| | | Collections.unmodifiableSet(s2)); |
| | | } |
| | | |
| | | // and rebuild the global list to be ready for usage |
| | | Set<String> s = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesByServer.values()) |
| | | { |
| | | s.addAll(attributes); |
| | | } |
| | | eclIncludesAllServers = Collections.unmodifiableSet(s); |
| | | |
| | | s = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesForDeletesByServer.values()) |
| | | { |
| | | s.addAll(attributes); |
| | | } |
| | | eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s); |
| | | current = this.eclIncludes.get(); |
| | | updated = current.addIncludedAttributes( |
| | | serverId, includeAttributes, includeAttributesForDeletes); |
| | | } |
| | | |
| | | return configurationChanged; |
| | | while (!this.eclIncludes.compareAndSet(current, updated)); |
| | | return current != updated; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public Set<String> getEclIncludes() |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesAllServers; |
| | | } |
| | | return eclIncludes.get().includedAttrsAllServers; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public Set<String> getEclIncludesForDeletes() |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesForDeletesAllServers; |
| | | } |
| | | return eclIncludes.get().includedAttrsForDeletesAllServers; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | Set<String> getEclIncludes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesByServer.get(serverId); |
| | | } |
| | | return eclIncludes.get().includedAttrsByServer.get(serverId); |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | Set<String> getEclIncludesForDeletes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesForDeletesByServer.get(serverId); |
| | | } |
| | | return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId); |
| | | } |
| | | |
| | | /** |
| | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.service.ReplicationDomain.IEContext; |
| | | import org.opends.server.replication.service.ReplicationDomain.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | /** |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | |
| | | { |
| | | final ReplicationBroker broker = new ReplicationBroker( |
| | | new DummyReplicationDomain(generationId), new ServerState(), |
| | | config, generationId, getReplSessionSecurity()); |
| | | config, getReplSessionSecurity()); |
| | | connect(broker, port, timeout); |
| | | return broker; |
| | | } |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | |
| | | |
| | | private static final String REPLICATION_GENERATION_ID = |
| | | "ds-sync-generation-id"; |
| | | private static final Task NO_INIT_TASK = null; |
| | | |
| | | private long readGenIdFromSuffixRootEntry(String rootDn) throws Exception |
| | | { |
| | |
| | | replicationDomain.initExport(exportLdif, 2); |
| | | |
| | | // Perform full update from fake domain to fractional domain |
| | | replicationDomain.initializeRemote(DS1_ID); |
| | | replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK); |
| | | |
| | | /* |
| | | * Check fractional domain is operational and that filtering has been done |
| | |
| | | replicationDomain.initExport(exportLdif, 2); |
| | | |
| | | // Perform full update from fake domain to fractional domain |
| | | replicationDomain.initializeRemote(DS1_ID); |
| | | replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK); |
| | | |
| | | /* |
| | | * Chack fractional domain is operational and that filtering has been done |
| | | * Check fractional domain is operational and that filtering has been done |
| | | * during the full update |
| | | */ |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | |
| | | public TestBroker(List<ReplicationMsg> list) |
| | | { |
| | | super(new DummyReplicationDomain(0), null, |
| | | new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), 0, null); |
| | | new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), null); |
| | | this.list = list; |
| | | } |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true); |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | new DummyReplicationDomain(generationId), state, fakeCfg, generationId, security); |
| | | new DummyReplicationDomain(generationId), state, fakeCfg, security); |
| | | broker.start(); |
| | | checkConnection(30, broker, rs1Port); |
| | | return broker; |
| | |
| | | { |
| | | for (int count = 0; count< 50; count++) |
| | | { |
| | | List<DSInfo> dsList = ds3.getDsList(); |
| | | DSInfo ds3Info = null; |
| | | if (dsList.size() > 0) |
| | | { |
| | | ds3Info = dsList.get(0); |
| | | } |
| | | if (ds3Info != null |
| | | && ds3Info.getDsId() == DS2_ID |
| | | && ds3Info.getStatus() == ServerStatus.DEGRADED_STATUS) |
| | | DSInfo dsInfo = ds3.getReplicaInfos().get(DS2_ID); |
| | | if (dsInfo != null && dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | assertTrue(count < 50, "DS2 did not get degraded : " + ds3Info); |
| | | assertTrue(count < 50, "DS2 did not get degraded : " + dsInfo); |
| | | Thread.sleep(200); // Be sure status analyzer has time to test |
| | | } |
| | | } |
| | |
| | | rd.getGroupId(), rd.getRefUrls(), |
| | | rd.getEclIncludes(), rd.getEclIncludesForDeletes(), |
| | | ProtocolVersion.getCurrentVersion()); |
| | | final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList()); |
| | | final List<DSInfo> dsList = |
| | | new ArrayList<DSInfo>(rd.getReplicaInfos().values()); |
| | | dsList.add(dsInfo); |
| | | |
| | | TopoView dsTopoView = new TopoView(dsList, rd.getRsList()); |
| | | TopoView dsTopoView = new TopoView(dsList, rd.getRsInfos()); |
| | | assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId); |
| | | } |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.assertj.core.api.Assertions; |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.core.AddOperationBasis; |
| | | import org.opends.server.core.DirectoryServer; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | |
| | | assertEquals(bi.toString(16), pdu); |
| | | } |
| | | |
| | | @DataProvider(name="createTopologyData") |
| | | @DataProvider |
| | | public Object [][] createTopologyData() throws Exception |
| | | { |
| | | List<String> urls1 = new ArrayList<String>(); |
| | | urls1.add("ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)"); |
| | | urls1.add("ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber"); |
| | | List<String> urls1 = newList( |
| | | "ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)", |
| | | "ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber"); |
| | | |
| | | List<String> urls2 = new ArrayList<String>(); |
| | | List<String> urls2 = newList(); |
| | | |
| | | List<String> urls3 = new ArrayList<String>(); |
| | | urls3.add("ldaps://host:port/dc=foo??sub?(sn=One Entry)"); |
| | | List<String> urls3 = newList( |
| | | "ldaps://host:port/dc=foo??sub?(sn=One Entry)"); |
| | | |
| | | List<String> urls4 = new ArrayList<String>(); |
| | | urls4.add("ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)"); |
| | | urls4.add("ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)"); |
| | | List<String> urls4 = newList( |
| | | "ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)", |
| | | "ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)"); |
| | | |
| | | DSInfo dsInfo1 = new DSInfo(13, "dsHost1:111", 26, 154631, ServerStatus.FULL_UPDATE_STATUS, |
| | | DSInfo dsInfo1 = new DSInfo(13, "", 26, 154631, ServerStatus.FULL_UPDATE_STATUS, |
| | | false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, new HashSet<String>(), new HashSet<String>(), (short)-1); |
| | | |
| | | DSInfo dsInfo2 = new DSInfo(-436, "dsHost2:222", 493, -227896, ServerStatus.DEGRADED_STATUS, |
| | | DSInfo dsInfo2 = new DSInfo(-436, "", 493, -227896, ServerStatus.DEGRADED_STATUS, |
| | | true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, new HashSet<String>(), new HashSet<String>(), (short)-1); |
| | | |
| | | DSInfo dsInfo3 = new DSInfo(2436, "dsHost3:333", 591, 0, ServerStatus.NORMAL_STATUS, |
| | | DSInfo dsInfo3 = new DSInfo(2436, "", 591, 0, ServerStatus.NORMAL_STATUS, |
| | | false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, new HashSet<String>(), new HashSet<String>(), (short)-1); |
| | | |
| | | DSInfo dsInfo4 = new DSInfo(415, "dsHost4:444", 146, 0, ServerStatus.BAD_GEN_ID_STATUS, |
| | | DSInfo dsInfo4 = new DSInfo(415, "", 146, 0, ServerStatus.BAD_GEN_ID_STATUS, |
| | | true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, new HashSet<String>(), new HashSet<String>(), (short)-1); |
| | | |
| | | List<DSInfo> dsList1 = new ArrayList<DSInfo>(); |
| | | dsList1.add(dsInfo1); |
| | | |
| | | List<DSInfo> dsList2 = new ArrayList<DSInfo>(); |
| | | |
| | | List<DSInfo> dsList3 = new ArrayList<DSInfo>(); |
| | | dsList3.add(dsInfo2); |
| | | |
| | | List<DSInfo> dsList4 = new ArrayList<DSInfo>(); |
| | | dsList4.add(dsInfo4); |
| | | dsList4.add(dsInfo3); |
| | | dsList4.add(dsInfo2); |
| | | dsList4.add(dsInfo1); |
| | | Set<DSInfo> dsList1 = newSet(dsInfo1); |
| | | Set<DSInfo> dsList2 = newSet(); |
| | | Set<DSInfo> dsList3 = newSet(dsInfo2); |
| | | Set<DSInfo> dsList4 = newSet(dsInfo4, dsInfo3, dsInfo2, dsInfo1); |
| | | |
| | | RSInfo rsInfo1 = new RSInfo(4527, null, 45316, (byte)103, 1); |
| | | RSInfo rsInfo2 = new RSInfo(4527, null, 0, (byte)0, 1); |
| | | RSInfo rsInfo3 = new RSInfo(0, null, -21113, (byte)98, 1); |
| | | |
| | | List<RSInfo> rsList1 = new ArrayList<RSInfo>(); |
| | | rsList1.add(rsInfo1); |
| | | List<RSInfo> rsList1 = newList(rsInfo1); |
| | | List<RSInfo> rsList2 = newList(rsInfo1, rsInfo2, rsInfo3); |
| | | |
| | | List<RSInfo> rsList2 = new ArrayList<RSInfo>(); |
| | | rsList2.add(rsInfo1); |
| | | rsList2.add(rsInfo2); |
| | | rsList2.add(rsInfo3); |
| | | |
| | | return new Object [][] { |
| | | return new Object[][] { |
| | | {"1a01313300323600313534363331000300020c84026c6461703a2f2f6c6461702e697" + |
| | | "06c616e65742e636f6d2f6f3d746573743f3f7375623f28736e3d4a656e73656e2900" + |
| | | "6c646170733a2f2f6c6461702e69706c616e65742e636f6d3a343034312f7569643d6" + |
| | |
| | | "6c6570686f6e654e756d6265720001343532370034353331360067",dsList1, rsList1}, |
| | | {"1a0003343532370034353331360067343532370030000030002d32313131330062", dsList2, rsList2}, |
| | | {"1a012d34333600343933002d32323738393600020101f9f70001343532370034353331360067", dsList3, rsList1}, |
| | | {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, new ArrayList<RSInfo>()}, |
| | | {"1a0001343532370034353331360067", new ArrayList<DSInfo>(), rsList1}, |
| | | {"1a0000", new ArrayList<DSInfo>(), new ArrayList<RSInfo>()}, |
| | | {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, newList()}, |
| | | {"1a0001343532370034353331360067", newSet(), rsList1}, |
| | | {"1a0000", newSet(), newList()}, |
| | | {"1a0434313500313436003000040102020f026c646170733a2f2f686f73743a706f727" + |
| | | "42f64633d666f6f626172313f3f7375623f28736e3d416e6f7468657220456e747279" + |
| | | "203129006c646170733a2f2f686f73743a706f72742f64633d666f6f626172323f3f7" + |
| | |
| | | } |
| | | |
| | | @Test(dataProvider = "createTopologyData") |
| | | public void oldTopologyPDUs(String oldPdu, List<DSInfo> dsList, List<RSInfo> rsList) |
| | | public void oldTopologyPDUs(String oldPdu, Set<DSInfo> dsList, List<RSInfo> rsList) |
| | | throws Exception |
| | | { |
| | | TopologyMsg msg = new TopologyMsg(hexStringToByteArray(oldPdu), |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V3); |
| | | assertEquals(msg.getDsList(), dsList); |
| | | assertEquals(msg.getRsList(), rsList); |
| | | BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3)); |
| | | assertEquals(bi.toString(16), oldPdu); |
| | | Assertions.assertThat(new HashSet<DSInfo>(msg.getReplicaInfos().values())) |
| | | .isEqualTo(dsList); |
| | | assertEquals(msg.getRsInfos(), rsList); |
| | | if (msg.getReplicaInfos().values().equals(dsList)) |
| | | { |
| | | // Unfortunately this check does not work when the order of the |
| | | // replicaInfos collection is not exactly the same as the dsList |
| | | BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3)); |
| | | assertEquals(bi.toString(16), oldPdu); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name="createEntryMsgData") |
| | |
| | | int pos = 0; |
| | | int length = 2; |
| | | int msgid = 14; |
| | | Object[] set1 = new Object[] {sid, dest, entryBytes, pos, length, msgid}; |
| | | |
| | | return new Object [][] { set1}; |
| | | return new Object[][] { { sid, dest, entryBytes, pos, length, msgid } }; |
| | | } |
| | | |
| | | /** |
| | |
| | | int sender = 1; |
| | | int dest = 2; |
| | | Message message = ERR_UNKNOWN_TYPE.get("toto"); |
| | | Object[] set1 = new Object[] {sender, dest, message}; |
| | | return new Object [][] { set1}; |
| | | return new Object[][] { { sender, dest, message } }; |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | TopologyMsg msg = new TopologyMsg(dsList, rsList); |
| | | TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), |
| | | ProtocolVersion.getCurrentVersion()); |
| | | assertEquals(msg.getDsList(), newMsg.getDsList()); |
| | | assertEquals(msg.getRsList(), newMsg.getRsList()); |
| | | assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos()); |
| | | assertEquals(msg.getRsInfos(), newMsg.getRsInfos()); |
| | | } |
| | | |
| | | /** |
| | |
| | | private void waitForStableTopo(FakeReplicationDomain fakeRd, int expectedDs, |
| | | int expectedRs) throws Exception |
| | | { |
| | | List<DSInfo> dsInfo = null; |
| | | Map<Integer, DSInfo> dsInfo = null; |
| | | List<RSInfo> rsInfo = null; |
| | | long nSec = 0; |
| | | long startTime = System.currentTimeMillis(); |
| | | do |
| | | { |
| | | dsInfo = fakeRd.getReplicasList(); |
| | | rsInfo = fakeRd.getRsList(); |
| | | dsInfo = fakeRd.getReplicaInfos(); |
| | | rsInfo = fakeRd.getRsInfos(); |
| | | if (dsInfo.size() == expectedDs && rsInfo.size() == expectedRs) |
| | | { |
| | | debugInfo("waitForStableTopo: expected topo obtained after " + nSec + " second(s)."); |
| | |
| | | // DS must see expected numbers of DSs/RSs |
| | | final FakeReplicationDomain fakeRd1 = fakeRDs[1]; |
| | | waitForStableTopo(fakeRd1, 1, 1); |
| | | List<DSInfo> dsInfos = fakeRd1.getReplicasList(); |
| | | DSInfo dsInfo = dsInfos.get(0); |
| | | DSInfo dsInfo = fakeRd1.getReplicaInfos().get(FDS2_ID); |
| | | assertEquals(dsInfo.getDsId(), FDS2_ID); |
| | | assertEquals(dsInfo.getStatus(), ServerStatus.NORMAL_STATUS); |
| | | |
| | |
| | | } |
| | | |
| | | // Wait for DS2 being degraded |
| | | boolean error = true; |
| | | for (int count = 0; count < 12; count++) |
| | | { |
| | | dsInfos = fakeRd1.getReplicasList(); |
| | | if (dsInfos == null) |
| | | continue; |
| | | if (dsInfos.size() == 0) |
| | | continue; |
| | | dsInfo = dsInfos.get(0); |
| | | if ( (dsInfo.getDsId() == FDS2_ID) && |
| | | (dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS) ) |
| | | { |
| | | error = false; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | Thread.sleep(1000); |
| | | } |
| | | } |
| | | assertFalse(error, "DS2 not in degraded status"); |
| | | expectStatusForDS(fakeRd1, ServerStatus.DEGRADED_STATUS, FDS2_ID); |
| | | |
| | | Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked |
| | | assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4); |
| | |
| | | fakeRd2.startListenService(); |
| | | |
| | | // Wait for DS2 being back to normal |
| | | error = true; |
| | | for (int count = 0; count < 12; count++) |
| | | { |
| | | dsInfos = fakeRd1.getReplicasList(); |
| | | if (dsInfos == null) |
| | | continue; |
| | | if (dsInfos.size() == 0) |
| | | continue; |
| | | dsInfo = dsInfos.get(0); |
| | | if ( (dsInfo.getDsId() == FDS2_ID) && |
| | | (dsInfo.getStatus() == ServerStatus.NORMAL_STATUS) ) |
| | | { |
| | | error = false; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | Thread.sleep(1000); |
| | | } |
| | | } |
| | | assertFalse(error, "DS2 not back to normal status"); |
| | | expectStatusForDS(fakeRd1, ServerStatus.NORMAL_STATUS, FDS2_ID); |
| | | |
| | | // DS2 should also change status so reset its assured monitoring data so no received sr updates |
| | | assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5); |
| | |
| | | assertEquals(fakeRd2.getReceivedUpdates(), 6); |
| | | assertEquals(fakeRd2.getWrongReceivedUpdates(), 1); |
| | | assertFalse(fakeRd2.receivedUpdatesOk()); |
| | | } finally |
| | | } |
| | | finally |
| | | { |
| | | endTest(); |
| | | } |
| | | } |
| | | |
| | | private void expectStatusForDS(final ReplicationDomain domain, |
| | | ServerStatus expectedStatus, int dsId) throws InterruptedException |
| | | { |
| | | for (int count = 0; count < 12; count++) |
| | | { |
| | | final DSInfo dsInfo = domain.getReplicaInfos().get(dsId); |
| | | if (dsInfo != null && dsInfo.getStatus() == expectedStatus) |
| | | { |
| | | return; |
| | | } |
| | | Thread.sleep(1000); |
| | | } |
| | | Assert.fail("DS(" + dsId + ") did not have expected status " |
| | | + expectedStatus + " after 12 seconds"); |
| | | } |
| | | |
| | | private void assertContainsOnly(Map<Integer, Integer> map, int key, |
| | | int expectedValue) |
| | | { |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | final long generationId = getGenerationId(TEST_ROOT_DN); |
| | | broker = new ReplicationBroker(new DummyReplicationDomain(generationId), |
| | | state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort), |
| | | generationId, getReplSessionSecurity()); |
| | | getReplSessionSecurity()); |
| | | connect(broker, replicationServerPort, 5000); |
| | | |
| | | ReplicationMsg receivedMsg = broker.receive(); |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import java.util.*; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.service.ReplicationBroker.ReplicationServerInfo; |
| | | import org.opends.server.replication.service.ReplicationBroker.Topology; |
| | | import org.testng.Assert; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static java.util.Collections.*; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class ReplicationBrokerTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | private static enum TopologyCtorToUse |
| | | { |
| | | BUILD_WITH_TOPOLOGY_MSG, BUILD_WITH_DS_RS_LISTS; |
| | | } |
| | | |
| | | private final int CURRENT_RS_ID = 91; |
| | | private final int MISSING_RS_ID = 93; |
| | | private final int ANOTHER_RS_ID = 94; |
| | | private final DSInfo CURRENT_DS = dsInfo(11, CURRENT_RS_ID); |
| | | private final DSInfo OTHER_DS = dsInfo(12, CURRENT_RS_ID); |
| | | private final DSInfo MISSING_DS = dsInfo(13, CURRENT_RS_ID); |
| | | private final ReplicationServerInfo CURRENT_RS = rsInfo(CURRENT_RS_ID, |
| | | CURRENT_DS.getDsId(), OTHER_DS.getDsId()); |
| | | private final ReplicationServerInfo MISSING_RS = rsInfo(MISSING_RS_ID, |
| | | MISSING_DS.getDsId()); |
| | | private final ReplicationServerInfo ANOTHER_RS = rsInfo(ANOTHER_RS_ID); |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | private DSInfo dsInfo(int dsServerId, int rsServerId) |
| | | { |
| | | byte z = 0; |
| | | return new DSInfo(dsServerId, null, rsServerId, 0, null, |
| | | false, null, z, z, EMPTY_LIST, EMPTY_LIST, EMPTY_LIST, z); |
| | | } |
| | | |
| | | private ReplicationServerInfo rsInfo(int rsServerId, Integer... dsIds) |
| | | { |
| | | byte z = 0; |
| | | final RSInfo info = new RSInfo(rsServerId, rsServerId + ":1389", 0, z, 0); |
| | | return new ReplicationServerInfo(info, newSet(dsIds)); |
| | | } |
| | | |
| | | private Map<Integer, ReplicationServerInfo> newMap(ReplicationServerInfo... infos) |
| | | { |
| | | if (infos.length == 0) |
| | | { |
| | | return Collections.emptyMap(); |
| | | } |
| | | final Map<Integer, ReplicationServerInfo> map = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | for (ReplicationServerInfo info : infos) |
| | | { |
| | | map.put(info.getServerId(), info); |
| | | } |
| | | return map; |
| | | } |
| | | |
| | | private void assertInvariants(final Topology topo) |
| | | { |
| | | assertThat(topo.replicaInfos).doesNotContainKey(CURRENT_DS.getDsId()); |
| | | } |
| | | |
| | | private ReplicationServerInfo assertContainsRSWithDSs( |
| | | Map<Integer, ReplicationServerInfo> rsInfos, |
| | | ReplicationServerInfo rsInfo, Integer... connectedDSs) |
| | | { |
| | | return assertContainsRSWithDSs(rsInfos, rsInfo, newSet(connectedDSs)); |
| | | } |
| | | |
| | | private ReplicationServerInfo assertContainsRSWithDSs( |
| | | Map<Integer, ReplicationServerInfo> rsInfos, |
| | | ReplicationServerInfo rsInfo, Set<Integer> connectedDSs) |
| | | { |
| | | final ReplicationServerInfo info = find(rsInfos, rsInfo.toRSInfo()); |
| | | assertNotNull(info); |
| | | assertThat(info.getConnectedDSs()).containsAll(connectedDSs); |
| | | return info; |
| | | } |
| | | |
| | | private ReplicationServerInfo find(Map<Integer, ReplicationServerInfo> rsInfos, RSInfo rsInfo) |
| | | { |
| | | for (ReplicationServerInfo info : rsInfos.values()) |
| | | { |
| | | if (info.getServerId() == rsInfo.getId()) |
| | | { |
| | | return info; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | private Topology newTopology(TopologyCtorToUse toUse, |
| | | Map<Integer, DSInfo> replicaInfos, List<RSInfo> rsInfos, int dsServerId, int rsServerId, |
| | | Set<String> rsUrls, Map<Integer, ReplicationServerInfo> previousRSs) |
| | | { |
| | | if (TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG == toUse) |
| | | { |
| | | final TopologyMsg topologyMsg = new TopologyMsg(replicaInfos.values(), rsInfos); |
| | | return new Topology(topologyMsg, dsServerId, rsServerId, rsUrls, previousRSs); |
| | | } |
| | | else if (TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS == toUse) |
| | | { |
| | | return new Topology(replicaInfos, rsInfos, dsServerId, rsServerId, rsUrls, previousRSs); |
| | | } |
| | | Assert.fail("Do not know which Topology constructor to use: " + toUse); |
| | | return null; |
| | | } |
| | | |
| | | private Map<Integer, DSInfo> newMap(DSInfo... dsInfos) |
| | | { |
| | | final Map<Integer, DSInfo> results = new HashMap<Integer, DSInfo>(); |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | results.put(dsInfo.getDsId(), dsInfo); |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | @DataProvider |
| | | public Object[][] topologyCtorProvider() { |
| | | return new Object[][] { { TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG }, |
| | | { TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS } }; |
| | | } |
| | | |
| | | @Test(dataProvider = "topologyCtorProvider") |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldContainNothing(TopologyCtorToUse toUse) |
| | | throws Exception |
| | | { |
| | | final Topology topo = newTopology(toUse, |
| | | EMPTY_MAP, EMPTY_LIST, |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).isEmpty(); |
| | | } |
| | | |
| | | @Test |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldFilterOutCurrentDS() |
| | | { |
| | | final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG, |
| | | newMap(OTHER_DS, CURRENT_DS), EMPTY_LIST, |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).isEmpty(); |
| | | } |
| | | |
| | | @Test(dataProvider = "topologyCtorProvider") |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldContainRSWithoutOtherDS(TopologyCtorToUse toUse) |
| | | { |
| | | final Topology topo = newTopology(toUse, |
| | | newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()), |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).hasSize(1); |
| | | assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId()); |
| | | } |
| | | |
| | | @Test |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldContainRSWithAllDSs_buildWithTopologyMsg() |
| | | { |
| | | final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG, |
| | | newMap(CURRENT_DS, OTHER_DS), newList(CURRENT_RS.toRSInfo()), |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).hasSize(1); |
| | | assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs()); |
| | | } |
| | | |
| | | @Test(dataProvider = "topologyCtorProvider") |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldStillContainRS(TopologyCtorToUse toUse) throws Exception |
| | | { |
| | | final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS); |
| | | final Topology topo = newTopology(toUse, |
| | | newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()), |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).hasSize(1); |
| | | assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs()); |
| | | } |
| | | |
| | | @Test(dataProvider = "topologyCtorProvider") |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldStillContainRSWithNewlyProvidedDSs(TopologyCtorToUse toUse) |
| | | { |
| | | final ReplicationServerInfo CURRENT_RS_WITHOUT_DS = rsInfo(CURRENT_RS_ID); |
| | | final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS_WITHOUT_DS); |
| | | final Topology topo = newTopology(toUse, |
| | | newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()), |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).hasSize(1); |
| | | assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId(), OTHER_DS.getDsId()); |
| | | } |
| | | |
| | | @Test(dataProvider = "topologyCtorProvider") |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldHaveRemovedMissingRS(TopologyCtorToUse toUse) |
| | | { |
| | | final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS, MISSING_RS); |
| | | final Topology topo = newTopology(toUse, |
| | | newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()), |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).hasSize(1); |
| | | assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs()); |
| | | } |
| | | |
| | | @Test |
| | | @SuppressWarnings("unchecked") |
| | | public void topologyShouldHaveStampedLocallyConfiguredRSs_buildWithDsRsLists() |
| | | { |
| | | final Set<String> locallyConfigured = newSet(CURRENT_RS.getServerURL()); |
| | | final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS, |
| | | newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo(), ANOTHER_RS.toRSInfo()), |
| | | CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), locallyConfigured, EMPTY_MAP); |
| | | assertInvariants(topo); |
| | | assertThat(topo.rsInfos).hasSize(2); |
| | | ReplicationServerInfo currentRS = |
| | | assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs()); |
| | | ReplicationServerInfo anotherRS = |
| | | assertContainsRSWithDSs(topo.rsInfos, ANOTHER_RS); |
| | | assertThat(currentRS.isLocallyConfigured()).isTrue(); |
| | | assertThat(anotherRS.isLocallyConfigured()).isFalse(); |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import java.util.*; |
| | | import java.util.Arrays; |
| | | import java.util.Map; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.service.ReplicationDomain.IEContext; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | |
| | | @SuppressWarnings("javadoc") |
| | | public class ReplicationDomainTest extends ReplicationTestCase |
| | | { |
| | | private static final Task NO_INIT_TASK = null; |
| | | |
| | | @DataProvider(name = "publishAndReceiveData") |
| | | public Object[][] createpublishAndReceiveData() |
| | | { |
| | |
| | | assertNotNull(rcvdMsg); |
| | | assertEquals(test, rcvdMsg.getPayload()); |
| | | |
| | | for (RSInfo replServerInfo : domain1.getRsList()) |
| | | for (RSInfo replServerInfo : domain1.getRsInfos()) |
| | | { |
| | | // The generation Id of the remote should be 1 |
| | | assertEquals(replServerInfo.getGenerationId(), 1, |
| | | "Unexpected value of generationId in RSInfo for RS=" + replServerInfo); |
| | | } |
| | | |
| | | for (DSInfo serverInfo : domain1.getReplicasList()) |
| | | for (DSInfo serverInfo : domain1.getReplicaInfos().values()) |
| | | { |
| | | assertEquals(serverInfo.getStatus(), ServerStatus.NORMAL_STATUS); |
| | | } |
| | |
| | | domain1.resetReplicationLog(); |
| | | Thread.sleep(500); |
| | | |
| | | for (RSInfo replServerInfo : domain1.getRsList()) |
| | | for (RSInfo replServerInfo : domain1.getRsInfos()) |
| | | { |
| | | // The generation Id of the remote should now be 2 |
| | | assertEquals(replServerInfo.getGenerationId(), 2, |
| | |
| | | { |
| | | try |
| | | { |
| | | assertExpectedServerStatuses(domain1.getReplicasList(), |
| | | assertExpectedServerStatuses(domain1.getReplicaInfos(), |
| | | domain1ServerId, domain2ServerId); |
| | | assertExpectedServerStatuses(domain2.getReplicasList(), |
| | | assertExpectedServerStatuses(domain2.getReplicaInfos(), |
| | | domain1ServerId, domain2ServerId); |
| | | |
| | | Map<Integer, ServerState> states1 = domain1.getReplicaStates(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertExpectedServerStatuses(List<DSInfo> dsInfos, |
| | | private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos, |
| | | int domain1ServerId, int domain2ServerId) |
| | | { |
| | | for (DSInfo serverInfo : dsInfos) |
| | | for (DSInfo serverInfo : dsInfos.values()) |
| | | { |
| | | if (serverInfo.getDsId() == domain2ServerId) |
| | | { |
| | | assertEquals(serverInfo.getStatus(), ServerStatus.BAD_GEN_ID_STATUS); |
| | | } |
| | | else |
| | | { |
| | | assertEquals(serverInfo.getDsId(), domain1ServerId); |
| | |
| | | * Trigger a total update from domain1 to domain2. |
| | | * Check that the exported data is correctly received on domain2. |
| | | */ |
| | | for (DSInfo remoteDS : domain2.getReplicasList()) |
| | | { |
| | | if (remoteDS.getDsId() != domain2.getServerId()) |
| | | { |
| | | domain2.initializeFromRemote(remoteDS.getDsId()); |
| | | break; |
| | | } |
| | | } |
| | | |
| | | assertTrue(initializeFromRemote(domain2)); |
| | | waitEndExport(exportedData, importedData); |
| | | assertExportSucessful(domain1, domain2, exportedData, importedData); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean initializeFromRemote(ReplicationDomain domain) throws DirectoryException |
| | | { |
| | | for (DSInfo remoteDS : domain.getReplicaInfos().values()) |
| | | { |
| | | if (remoteDS.getDsId() != domain.getServerId()) |
| | | { |
| | | domain.initializeFromRemote(remoteDS.getDsId(), NO_INIT_TASK); |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Test that a ReplicationDomain is able to export and import its database |
| | | * across 2 replication servers. |
| | |
| | | domain2 = new FakeReplicationDomain( |
| | | testService, 2, servers2, 0, null, importedData, 0); |
| | | |
| | | domain2.initializeFromRemote(1); |
| | | domain2.initializeFromRemote(1, NO_INIT_TASK); |
| | | |
| | | waitEndExport(exportedData, importedData); |
| | | assertExportSucessful(domain1, domain2, exportedData, importedData); |
| | |
| | | { |
| | | return ieContext.getLeftEntryCount(); |
| | | } |
| | | return 0; // import/export is finished |
| | | return 0; // import/export is finished |
| | | } |
| | | |
| | | /** |
| | |
| | | * Trigger a total update from domain1 to domain2. |
| | | * Check that the exported data is correctly received on domain2. |
| | | */ |
| | | boolean alone = true; |
| | | while (alone) |
| | | while (!initializeFromRemote(domain1)) |
| | | { |
| | | for (DSInfo remoteDS : domain1.getReplicasList()) |
| | | { |
| | | if (remoteDS.getDsId() != domain1.getServerId()) |
| | | { |
| | | alone = false; |
| | | domain1.initializeFromRemote(remoteDS.getDsId() , null); |
| | | break; |
| | | } |
| | | } |
| | | if (alone) |
| | | { |
| | | System.out.println("trying..."); |
| | | Thread.sleep(1000); |
| | | } |
| | | System.out.println("trying..."); |
| | | Thread.sleep(1000); |
| | | } |
| | | System.out.println("waiting"); |
| | | Thread.sleep(10000000); |